# standard lib imports
import inspect
import time
import math
import logging
from random import shuffle
from datetime import datetime
# third party imports
import pandas as pd
from sklearn.base import BaseEstimator
from sklearn.exceptions import NotFittedError
# custom imports
from cobra.preprocessing import CategoricalDataProcessor
from cobra.preprocessing import KBinsDiscretizer
from cobra.preprocessing import TargetEncoder
log = logging.getLogger(__name__)
[docs]class PreProcessor(BaseEstimator):
"""This class implements a so-called facade pattern to define a
higher-level interface to work with the CategoricalDataProcessor,
KBinsDiscretizer and TargetEncoder classes, so that their fit and transform
methods are called in the correct order.
Additionally, it provides methods such as (de)serialization to/from JSON
so that preprocessing pipelines can be stored and reloaded, example for scoring.
We refer to the README of the GitHub repository for more background information
on the preprocessing methodology.
Attributes
----------
categorical_data_processor : CategoricalDataProcessor
Instance of CategoricalDataProcessor to do the preprocessing of
categorical variables.
discretizer : KBinsDiscretizer
Instance of KBinsDiscretizer to do the preprocessing of continuous
variables by means of discretization.
target_encoder : TargetEncoder
Instance of TargetEncoder to do the incidence replacement.
is_fitted : bool
Whether or not object is yet fit.
model_type : str
The model_type variable as specified in CategoricalDataProcessor
(``classification`` or ``regression``).
"""
def __init__(self,
categorical_data_processor: CategoricalDataProcessor,
discretizer: KBinsDiscretizer,
target_encoder: TargetEncoder,
is_fitted: bool = False):
self._categorical_data_processor = categorical_data_processor
self._discretizer = discretizer
self._target_encoder = target_encoder
self._is_fitted = is_fitted
self.model_type = categorical_data_processor.model_type
[docs] @classmethod
def from_params(cls,
model_type: str="classification",
n_bins: int=10,
strategy: str="quantile",
closed: str="right",
auto_adapt_bins: bool=False,
starting_precision: int=0,
label_format: str="{} - {}",
change_endpoint_format: bool=False,
regroup: bool=True,
regroup_name: str="Other",
keep_missing: bool=True,
category_size_threshold: int=5,
p_value_threshold: float=0.001,
scale_contingency_table: bool=True,
forced_categories: dict={},
weight: float=0.0,
imputation_strategy: str="mean"):
"""Constructor to instantiate PreProcessor from all the parameters
that can be set in all its required (attribute) classes
along with good default values.
Parameters
----------
model_type : str
Model type (``classification`` or ``regression``).
n_bins : int, optional
Number of bins to produce. Raises ValueError if ``n_bins < 2``.
strategy : str, optional
Binning strategy. Currently only ``uniform`` and ``quantile``
e.g. equifrequency is supported.
closed : str, optional
Whether to close the bins (intervals) from the left or right.
auto_adapt_bins : bool, optional
Reduces the number of bins (starting from n_bins) as a function of
the number of missings.
starting_precision : int, optional
Initial precision for the bin edges to start from,
can also be negative. Given a list of bin edges, the class will
automatically choose the minimal precision required to have proper
bins e.g. ``[5.5555, 5.5744, ...]`` will be rounded
to ``[5.56, 5.57, ...]``. In case of a negative number, an attempt
will be made to round up the numbers of the bin edges
e.g. ``5.55 -> 10``, ``146 -> 100``, ...
label_format : str, optional
Format string to display the bin labels
e.g. ``min - max``, ``(min, max]``, ...
change_endpoint_format : bool, optional
Whether or not to change the format of the lower and upper bins
into ``< x`` and ``> y`` resp.
regroup : bool
Whether or not to regroup categories.
regroup_name : str
New name of the non-significant regrouped variables.
keep_missing : bool
Whether or not to keep missing as a separate category.
category_size_threshold : int
All categories with a size (corrected for incidence if applicable)
in the training set above this threshold are kept as a separate category,
if statistical significance w.r.t. target is detected. Remaining
categories are converted into ``Other`` (or else, cf. regroup_name).
p_value_threshold : float
Significance threshold for regrouping.
forced_categories : dict
Map to prevent certain categories from being grouped into ``Other``
for each column - dict of the form ``{col:[forced vars]}``.
scale_contingency_table : bool
Whether contingency table should be scaled before chi^2.
weight : float, optional
Smoothing parameters (non-negative). The higher the value of the
parameter, the bigger the contribution of the overall mean.
When set to zero, there is no smoothing (e.g. the pure target incidence is used).
imputation_strategy : str, optional
In case there is a particular column which contains new categories,
the encoding will lead to NULL values which should be imputed.
Valid strategies are to replace with the global mean of the train
set or the min (resp. max) incidence of the categories of that
particular variable.
Returns
-------
PreProcessor
Class encapsulating CategoricalDataProcessor,
KBinsDiscretizer, and TargetEncoder instances.
"""
categorical_data_processor = CategoricalDataProcessor(model_type,
regroup,
regroup_name, keep_missing,
category_size_threshold,
p_value_threshold,
scale_contingency_table,
forced_categories)
discretizer = KBinsDiscretizer(n_bins, strategy, closed,
auto_adapt_bins,
starting_precision,
label_format,
change_endpoint_format)
target_encoder = TargetEncoder(weight, imputation_strategy)
return cls(categorical_data_processor, discretizer, target_encoder)
[docs] @classmethod
def from_pipeline(cls, pipeline: dict):
"""Constructor to instantiate PreProcessor from a (fitted) pipeline
which was stored as a JSON file and passed to this function as a dict.
Parameters
----------
pipeline : dict
The (fitted) pipeline as a dictionary.
Returns
-------
PreProcessor
Instance of PreProcessor instantiated from a stored pipeline.
Raises
------
ValueError
If the loaded pipeline does not have all required parameters
and no others.
"""
if not PreProcessor._is_valid_pipeline(pipeline):
raise ValueError("Invalid pipeline, as it does not "
"contain all and only the required parameters.")
categorical_data_processor = CategoricalDataProcessor()
categorical_data_processor.set_attributes_from_dict(
pipeline["categorical_data_processor"]
)
# model_type = categorical_data_processor.model_type
discretizer = KBinsDiscretizer()
discretizer.set_attributes_from_dict(pipeline["discretizer"])
target_encoder = TargetEncoder()
target_encoder.set_attributes_from_dict(pipeline["target_encoder"])
return cls(categorical_data_processor, discretizer, target_encoder,
is_fitted=pipeline["_is_fitted"])
[docs] def fit(self, train_data: pd.DataFrame, continuous_vars: list,
discrete_vars: list, target_column_name: str):
"""Fit the data to the preprocessing pipeline.
Parameters
----------
train_data : pd.DataFrame
Data to be preprocessed.
continuous_vars : list
List of continuous variables.
discrete_vars : list
List of discrete variables.
target_column_name : str
Column name of the target.
"""
# get list of all variables
preprocessed_variable_names = (PreProcessor
._get_variable_list(continuous_vars,
discrete_vars))
log.info("Starting to fit pipeline")
start = time.time()
# Ensure to operate on separate copy of data
train_data = train_data.copy()
# Fit discretizer, categorical preprocessor & target encoder
# Note that in order to fit target_encoder, we first have to transform
# the data using the fitted discretizer & categorical_data_processor
if continuous_vars:
begin = time.time()
self._discretizer.fit(train_data, continuous_vars)
log.info("Fitting KBinsDiscretizer took {} seconds"
.format(time.time() - begin))
train_data = self._discretizer.transform(train_data,
continuous_vars)
if discrete_vars:
begin = time.time()
self._categorical_data_processor.fit(train_data,
discrete_vars,
target_column_name)
log.info("Fitting categorical_data_processor class took {} seconds"
.format(time.time() - begin))
train_data = (self._categorical_data_processor
.transform(train_data, discrete_vars))
begin = time.time()
self._target_encoder.fit(train_data, preprocessed_variable_names,
target_column_name)
log.info("Fitting TargetEncoder took {} seconds"
.format(time.time() - begin))
self._is_fitted = True # set fitted boolean to True
log.info("Fitting pipeline took {} seconds"
.format(time.time() - start))
[docs] @staticmethod
def train_selection_validation_split(data: pd.DataFrame,
train_prop: float=0.6,
selection_prop: float=0.2,
validation_prop: float=0.2) -> pd.DataFrame:
"""Adds `split` column with train/selection/validation values
to the dataset.
Train set = data on which the model is trained and on which the encoding is based.
Selection set = data used for univariate and forward feature selection. Often called the validation set.
Validation set = data that generates the final performance metrics. Often called the test set.
Parameters
----------
data : pd.DataFrame
Input dataset to split into train-selection and validation sets.
train_prop : float, optional
Percentage data to put in train set.
selection_prop : float, optional
Percentage data to put in selection set.
validation_prop : float, optional
Percentage data to put in validation set.
Returns
-------
pd.DataFrame
DataFrame with additional split column.
"""
if not math.isclose(train_prop + selection_prop + validation_prop, 1.0):
raise ValueError("The sum of train_prop, selection_prop and "
"validation_prop must be 1.0.")
if train_prop == 0.0:
raise ValueError("train_prop cannot be zero!")
if selection_prop == 0.0:
raise ValueError("selection_prop cannot be zero!")
nrows = data.shape[0]
size_train = int(train_prop * nrows)
size_select = int(selection_prop * nrows)
size_valid = int(validation_prop * nrows)
correction = nrows - (size_train+size_select+size_valid)
split = ['train'] * size_train \
+ ['train'] * correction \
+ ['selection'] * size_select \
+ ['validation'] * size_valid
shuffle(split)
data['split'] = split
return data
[docs] def serialize_pipeline(self) -> dict:
"""Serialize the preprocessing pipeline by writing all its required
parameters to a dictionary to later store it as a JSON file.
Returns
-------
dict
Return the pipeline as a dictionary.
"""
pipeline = {
"metadata": {
"timestamp": datetime.now().strftime("%d/%m/%Y %H:%M:%S")
}
}
pipeline["categorical_data_processor"] = (self
._categorical_data_processor
.attributes_to_dict())
pipeline["discretizer"] = self._discretizer.attributes_to_dict()
pipeline["target_encoder"] = (self._target_encoder
.attributes_to_dict())
pipeline["_is_fitted"] = True
return pipeline
@staticmethod
def _is_valid_pipeline(pipeline: dict) -> bool:
"""Validate the loaded pipeline by checking if all required parameters
are present (and no others!).
Parameters
----------
pipeline : dict
Loaded pipeline from JSON file.
"""
keys = inspect.getfullargspec(PreProcessor.from_params).args
valid_keys = set([key for key in keys
if key not in ["cls", "serialization_path"]])
input_keys = set()
for key in pipeline:
if key in ["categorical_data_processor", "discretizer",
"target_encoder"]:
input_keys = input_keys.union(set(pipeline[key].keys()))
elif key != "metadata":
input_keys.add(key)
input_keys = sorted(list(input_keys))
input_keys = [key for key in input_keys if not key.startswith("_")]
return sorted(list(valid_keys)) == sorted(list(input_keys))
@staticmethod
def _get_variable_list(continuous_vars: list, discrete_vars: list) -> list:
"""Merge lists of continuous_vars and discrete_vars and add suffix
"_bin" resp. "_processed" to the predictors.
Parameters
----------
continuous_vars : list
List of continuous variables.
discrete_vars : list
List of discrete variables.
Returns
-------
list
Merged list of predictors with proper suffixes added.
Raises
------
ValueError
In case both lists are empty.
"""
var_list = ([col + "_processed" for col in discrete_vars]
+ [col + "_bin" for col in continuous_vars])
if not var_list:
raise ValueError("Variable var_list is None or empty list.")
return var_list