import logging
from typing import Callable, Optional
import pandas as pd
from tqdm.auto import tqdm
from cobra.model_building import LogisticRegressionModel, LinearRegressionModel
log = logging.getLogger(__name__)
[docs]class ForwardFeatureSelection:
    """Perform forward feature selection for a given dataset using a given
    algorithm.
    Predictors are sequentially added to the model, starting with the one that
    has the highest univariate predictive power, and then proceeding with those that
    jointly lead to the best fit, optimizing for selection AUC or RMSE. Interaction
    effects are not explicitly modeled, yet they are implicitly present given the
    feature selection and the underlying feature correlation structure.
    Attributes
    ----------
    model_type : str
        Model type (``classification`` or ``regression``).
    MLModel: Cobra model
        LogisticRegressionModel or LinearRegressionModel.
    max_predictors : int
        Maximum number of predictors allowed in any model. This corresponds
        more or less with the maximum number of steps in the forward feature
        selection.
    pos_only : bool
        Whether or not the model coefficients should all be positive (no sign flips).
    self._fitted_models : list
        List of fitted models.
    """
    def __init__(self,
                 model_type: str="classification",
                 max_predictors: int=50,
                 pos_only: bool=True):
        self.model_type = model_type
        if model_type == "classification":
            self.MLModel = LogisticRegressionModel
        elif model_type == "regression":
            self.MLModel = LinearRegressionModel
        self.max_predictors = max_predictors
        self.pos_only = pos_only
        self._fitted_models = []
[docs]    def get_model_from_step(self, step: int):
        """Get fitted model from a particular step.
        Parameters
        ----------
        step : int
            Particular step in the forward selection.
        Returns
        -------
        self.MLModel
            Fitted model from the given step.
        Raises
        ------
        ValueError
            In case step is larger than the number of available models.
        """
        if len(self._fitted_models) <= step:
            raise ValueError(f"No model available for step {step}. "
                             "The first step starts from index 0.")
        return self._fitted_models[step] 
[docs]    def fit(self, train_data: pd.DataFrame, target_column_name: str,
            predictors: list, forced_predictors: list=[],
            excluded_predictors: list=[]):
        """Fit the forward feature selection estimator.
        Parameters
        ----------
        train_data : pd.DataFrame
            Data on which to fit the model. Should include a "train"
            and "selection" split for correct model selection! The
            "train" split is used to train a model, the "selection"
            split is used to evaluate which model to include in the
            actual forward feature selection.
        target_column_name : str
            Name of the target column.
        predictors : list
            List of predictors on which to train the estimator.
        forced_predictors : list, optional
            List of predictors to force in the estimator.
        excluded_predictors : list, optional
            List of predictors to exclude from the estimator.
        Raises
        ------
        ValueError
            In case the number of forced predictors is larger than the maximum
            number of allowed predictors in the model.
        """
        assert "split" in train_data.columns, "The train_data input df does not include a split column."
        assert len(set(["train", "selection"]).difference(set(train_data["split"].unique()))) == 0, \
            
"The train_data input df does not include a 'train' and 'selection' split."
        # remove excluded predictors from predictor lists
        filtered_predictors = [var for var in predictors
                               if (var not in excluded_predictors and
                                   var not in forced_predictors)]
        # checks on predictor lists and self.max_predictors attr
        if len(forced_predictors) > self.max_predictors:
            raise ValueError("Size of forced_predictors cannot be bigger than "
                             "max_predictors.")
        elif len(forced_predictors) == self.max_predictors:
            log.info("Size of forced_predictors equals max_predictors "
                     "only one model will be trained...")
            # train model with all forced_predictors (only)
            (self._fitted_models
             .append(self._train_model(train_data[train_data["split"] == "train"],
                                       target_column_name,
                                       forced_predictors)))
        else:
            self._fitted_models = self._forward_selection(train_data,
                                                          target_column_name,
                                                          filtered_predictors,
                                                          forced_predictors) 
    def _forward_selection(self,
                           train_data: pd.DataFrame,
                           target_column_name: str,
                           predictors: list,
                           forced_predictors: list = []) -> list:
        """Perform the forward feature selection algorithm to compute a list
        of models (with increasing performance). The length of the list,
        i.e. the number of models, is bounded by the max_predictors class
        attribute.
        Parameters
        ----------
        train_data : pd.DataFrame
            Data on which to fit the model.
        target_column_name : str
            Name of the target column.
        predictors : list
            List of predictors on which to train the models.
        forced_predictors : list, optional
            List of predictors to force in the models.
        Returns
        -------
        list
            List of fitted models where the index of the list indicates the
            number of predictors minus one (as indices start from 0).
        """
        fitted_models = []
        current_predictors = []
        max_steps = 1 + min(self.max_predictors,
                            len(predictors) + len(forced_predictors))
        for step in tqdm(range(1, max_steps), desc="Sequentially adding best "
                                                   "predictor..."):
            if step <= len(forced_predictors):
                # first, we go through the forced predictors
                candidate_predictors = [var for var in forced_predictors
                                        if var not in current_predictors]
            else:
                candidate_predictors = [var for var in (predictors
                                                        + forced_predictors)
                                        if var not in current_predictors]
            model = self._find_next_best_model(train_data,
                                               target_column_name,
                                               candidate_predictors,
                                               current_predictors)
            if model is not None:
                # Add new model predictors to the list of current predictors
                current_predictors = list(set(current_predictors)
                                          .union(set(model.predictors)))
                fitted_models.append(model)
            # else:
            #     # If model returns None for the first time,
            #     # one can in theory stop the feature selection process
            #     # but we leave it run such that tqdm cleanly finishes
            #     break
        if not fitted_models:
            log.error("No models found in forward selection.")
        return fitted_models
    def _find_next_best_model(self,
                              train_data: pd.DataFrame,
                              target_column_name: str,
                              candidate_predictors: list,
                              current_predictors: list):
        """Given a list of current predictors which are already selected to
        be include in the model, find amongst a list candidate predictors
        the predictor to add to the selected list so that the resulting model
        has the best performance.
        Parameters
        ----------
        train_data : pd.DataFrame
            Data on which to fit the model.
        target_column_name : str
            Name of the target column.
        candidate_predictors : list
            List of candidate predictors to test.
        current_predictors : list
            List of predictors on which to train the models.
        Returns
        -------
        self.MLModel
            Best performing model.
        """
        # placeholders
        best_model = None
        if self.MLModel == LogisticRegressionModel:
            best_performance = -1  # AUC metric is used
        elif self.MLModel == LinearRegressionModel:
            best_performance = float("inf")  # RMSE metric is used
        else:
            raise ValueError("No metric comparison method has been configured "
                             "for the given model_type specified as "
                             "ForwardFeatureSelection argument.")
        fit_data = train_data[train_data["split"] == "train"]  # data to fit the models with
        sel_data = train_data[train_data["split"] == "selection"]  # data to compare the models with
        for pred in candidate_predictors:
            # Train a model with an additional predictor
            model = self._train_model(fit_data, target_column_name,
                                      (current_predictors + [pred]))
            # Evaluate the model
            performance = (model
                           .evaluate(sel_data[current_predictors + [pred]],
                                     sel_data[target_column_name],
                                     split="selection"))
            if self.pos_only and (not (model.get_coef() >= 0).all()):
                continue
            # Check if the model is better than the current best model
            # and if it is, replace the current best.
            if self.MLModel == LogisticRegressionModel \
                    
and performance > best_performance:  # AUC metric is used
                best_performance = performance
                best_model = model
            elif self.MLModel == LinearRegressionModel \
                    
and performance < best_performance:  # RMSE metric is used
                best_performance = performance
                best_model = model
        return best_model
    def _train_model(self, train_data: pd.DataFrame, target_column_name: str,
                     predictors: list):
        """Train the model with a given set of predictors.
        Parameters
        ----------
        train_data : pd.DataFrame
            Data on which to fit the model.
        target_column_name : str
            Name of the target column.
        predictors : list
            List of predictors on which to train the models.
        Returns
        -------
        self.MLModel
            Trained model.
        """
        model = self.MLModel()
        model.fit(train_data[predictors], train_data[target_column_name])
        return model