# standard lib imports
import re
from typing import Optional
import logging
# third party imports
import numpy as np
import pandas as pd
from scipy import stats
from tqdm.auto import tqdm
from sklearn.base import BaseEstimator
from sklearn.exceptions import NotFittedError
log = logging.getLogger(__name__)
[docs]class CategoricalDataProcessor(BaseEstimator):
"""Regroups the categories of categorical variables based on significance
with target variable.
This class implements the Python Prediction's way of dealing with
categorical data preprocessing. There are three steps involved:
- An optional regrouping of the different categories based on category size
and significance of the category w.r.t. the target.
- For a given categorical variable, all categories below the (weighted)
category size threshold are put into a rest category (by default ``Other``)
- The remaining categories are subject to a statistical test, if there is
sufficient dependence with the target variable compared to all other categories,
the category is kept as-is, otherwise it is also put into the rest category
- Beware: one can force categories to be kept, and if no single category passes
the statistical test, the categorical variable is left unprocessed altogether
- Missing value replacement with the additional category ``Missing``.
- Change of dtype to ``category`` (could potentially lead to memory
optimization).
See the README of the GitHub repository for more methodological background information.
Attributes
----------
category_size_threshold : int
All categories with a size (corrected for incidence if applicable)
in the training set above this threshold are kept as a separate category,
if statistical significance w.r.t. target is detected. Remaining
categories are converted into ``Other`` (or else, cf. regroup_name).
forced_categories : dict
Map to prevent certain categories from being grouped into ``Other``
for each column - dict of the form ``{col:[forced vars]}``.
keep_missing : bool
Whether or not to keep missing as a separate category.
model_type : str
Model type (``classification`` or ``regression``).
p_value_threshold : float
Significance threshold for regrouping.
regroup : bool
Whether or not to regroup categories.
regroup_name : str
New name of the non-significant regrouped variables
scale_contingency_table : bool
Whether contingency table should be scaled before chi^2.
"""
valid_keys = ["model_type", "regroup", "regroup_name", "keep_missing",
"category_size_threshold", "p_value_threshold",
"scale_contingency_table", "forced_categories"]
def __init__(self,
model_type: str="classification",
regroup: bool=True,
regroup_name: str="Other",
keep_missing: bool=True,
category_size_threshold: int=5,
p_value_threshold: float=0.001,
scale_contingency_table: bool=True,
forced_categories: dict={}):
if model_type not in ["classification", "regression"]:
raise ValueError("An unexpected model_type was provided. A valid model_type is either 'classification' or 'regression'.")
self.model_type = model_type
self.regroup = regroup
self.regroup_name = regroup_name
self.keep_missing = keep_missing
self.category_size_threshold = category_size_threshold
self.p_value_threshold = p_value_threshold
self.scale_contingency_table = scale_contingency_table
self.forced_categories = forced_categories
# dict to store fitted output in
self._cleaned_categories_by_column = {}
[docs] def attributes_to_dict(self) -> dict:
"""Return the attributes of CategoricalDataProcessor as a dictionary.
Returns
-------
dict
Contains the attributes of CategoricalDataProcessor instance with
the attribute name as key.
"""
params = self.get_params()
params["_cleaned_categories_by_column"] = {
key: list(value)
for key, value in self._cleaned_categories_by_column.items()
}
return params
[docs] def set_attributes_from_dict(self, params: dict):
"""Set instance attributes from a dictionary of values with key the
name of the attribute.
Parameters
----------
params : dict
Contains the attributes of CategoricalDataProcessor with their
names as key.
Raises
------
ValueError
In case _cleaned_categories_by_column is not of type dict.
"""
_fitted_output = params.pop("_cleaned_categories_by_column", {})
if type(_fitted_output) != dict:
raise ValueError("_cleaned_categories_by_column is expected to "
"be a dict but is of type {} instead"
.format(type(_fitted_output)))
# Clean out params dictionary to remove unknown keys (for safety!)
params = {key: params[key] for key in params if key in self.valid_keys}
# We cannot turn this method into a classmethod as we want to make use
# of the following method from BaseEstimator:
self.set_params(**params)
self._cleaned_categories_by_column = {
key: set(value) for key, value in _fitted_output.items()
}
return self
[docs] def fit(self, data: pd.DataFrame, column_names: list,
target_column: str):
"""Fit the CategoricalDataProcessor.
Parameters
----------
data : pd.DataFrame
Data used to compute the mapping to encode the categorical
variables with.
column_names : list
Columns of data to be processed.
target_column : str
Column name of the target.
"""
if not self.regroup:
# We do not need to fit anything if regroup is set to False!
log.info("regroup was set to False, so no fitting is required")
return None
for column_name in tqdm(column_names, desc="Fitting category "
"regrouping..."):
if column_name not in data.columns:
log.warning("DataFrame has no column '{}', so it will be "
"skipped in fitting" .format(column_name))
continue
cleaned_cats = self._fit_column(data, column_name, target_column)
# Remove forced categories
forced_cats = self.forced_categories.get(column_name, set())
cleaned_cats = cleaned_cats.union(forced_cats)
# Add to _cleaned_categories_by_column for later use
self._cleaned_categories_by_column[column_name] = cleaned_cats
def _fit_column(self, data: pd.DataFrame, column_name: str,
target_column) -> set:
"""Compute which categories to regroup into "Other"
for a particular column, and return those that need
to be kept as-is.
Parameters
----------
data : pd.DataFrame
Description
column_name : str
Description
Returns
-------
list
List of categories to combine into a category "Other".
"""
model_type = self.model_type
if len(data[column_name].unique()) == 1:
log.warning(f"Predictor {column_name} is constant"
" and will be ignored in computation.")
return set(data[column_name].unique())
y = data[target_column]
if model_type == "classification":
incidence = y.mean()
else:
incidence = None
combined_categories = set()
# replace missings and get unique categories as a list
X = (CategoricalDataProcessor
._replace_missings(data[column_name])
.astype(object))
unique_categories = list(X.unique())
# do not merge categories in case of dummies, i.e. 0 and 1
# (and possibly "Missing")
if (len(unique_categories) == 2
or (len(unique_categories) == 3
and "Missing" in unique_categories)):
return set(unique_categories)
# get small categories and add them to the merged category list
# does not apply incidence factor when model_type = "regression"
small_categories = (CategoricalDataProcessor
._get_small_categories(
X,
incidence,
self.category_size_threshold))
combined_categories = combined_categories.union(small_categories)
for category in unique_categories:
if category in small_categories:
continue
pval = (CategoricalDataProcessor
._compute_p_value(X, y, category,
model_type,
self.scale_contingency_table))
# if not significant, add it to the list
if pval > self.p_value_threshold:
combined_categories.add(category)
# Remove missing category from combined_categories if required
if self.keep_missing:
combined_categories.discard("Missing")
return set(unique_categories).difference(combined_categories)
def _transform_column(self, data: pd.DataFrame,
column_name: str) -> pd.DataFrame:
"""Given a DataFrame, a column name and a list of categories to
combine, create an additional column which combines these categories
into "Other".
Parameters
----------
data : pd.DataFrame
Original data to be transformed.
column_name : str
Name of the column to transform.
Returns
-------
pd.DataFrame
Original DataFrame with an added processed column.
"""
column_name_clean = column_name + "_processed"
data.loc[:, column_name_clean] = data[column_name].astype(object)
# Fill missings first
data.loc[:, column_name_clean] = (CategoricalDataProcessor
._replace_missings(
data,
column_name_clean
))
if self.regroup:
categories = self._cleaned_categories_by_column.get(column_name)
if not categories:
# Log warning if categories is None, which indicates it is
# not in fitted output
if categories is None:
log.warning("Column '{}' is not in fitted output "
"and will be skipped".format(column_name))
return data
data.loc[:, column_name_clean] = (CategoricalDataProcessor
._replace_categories(
data[column_name_clean],
categories,
self.regroup_name))
# change data to categorical
data.loc[:, column_name_clean] = (data[column_name_clean]
.astype("category"))
return data
@staticmethod
def _get_small_categories(predictor_series: pd.Series,
incidence: float,
category_size_threshold: int) -> set:
"""Fetch categories with a size below a certain threshold.
Note that we use an additional weighting with the overall incidence.
Parameters
----------
predictor_series : pd.Series
Variables data.
incidence : float
Global train incidence.
category_size_threshold : int
Minimal size of a category to keep as a separate category.
Returns
-------
set
List a categories with a count below a certain threshold.
"""
category_counts = predictor_series.groupby(predictor_series).size()
if incidence is not None:
factor = max(incidence, 1 - incidence)
else:
factor = 1
# Get all categories with a count below a threshold
bool_mask = (category_counts*factor) <= category_size_threshold
return set(category_counts[bool_mask].index.tolist())
@staticmethod
def _replace_missings(data: pd.DataFrame,
column_names: Optional[list] = None) -> pd.DataFrame:
"""Replace missing values (incl. empty strings).
Parameters
----------
data : pd.DataFrame
Data to replace missings in.
column_names: list, optional
List of predictors to replace missings in.
Returns
-------
list
List of unique values in the data.
"""
# replace missings (incl. empty string)
regex = re.compile("^\\s+|\\s+$")
temp = None
if column_names:
temp = data[column_names]
else:
temp = data.copy()
temp = temp.fillna("Missing")
temp = temp.replace(regex, "")
temp = temp.replace("", "Missing")
return temp
@staticmethod
def _compute_p_value(X: pd.Series, y: pd.Series, category: str,
model_type: str,
scale_contingency_table: bool) -> float:
"""Calculates p-value in order to evaluate whether category of
interest is significantly different from the rest of the
categories, given the target variable.
In case model_type is "classification", chi-squared test based on a contingency table.
In case model_type is "regression", Kruskal-Wallis test.
Parameters
----------
X : pd.Series
Variables data.
y : pd.Series
Target data.
category : str
Category for which we carry out the test.
model_type : str
Model type (``classification`` or ``regression``).
scale_contingency_table : bool
Whether we scale contingency table with incidence rate.
Only used when model_type = "classification".
Returns
-------
float
The p-value of applied statistical test.
"""
df = pd.concat([X, y], axis=1)
df.columns = ["X", "y"]
df["other_categories"] = np.where(X == category, 0, 1)
if model_type == "classification":
contingency_table = pd.crosstab(index=df["other_categories"], columns=df["y"],
margins=False)
# if true, we scale the "other" categories
if scale_contingency_table:
size_other_cats = contingency_table.iloc[1].sum()
incidence_mean = y.mean()
contingency_table.iloc[1, 0] = (1-incidence_mean) * size_other_cats
contingency_table.iloc[1, 1] = incidence_mean * size_other_cats
contingency_table = contingency_table.values.astype(np.int64)
pval = stats.chi2_contingency(contingency_table, correction=False)[1]
elif model_type == "regression":
pval = stats.kruskal(df.y[df.other_categories == 0],
df.y[df.other_categories == 1])[1]
return pval
@staticmethod
def _replace_categories(data: pd.Series, categories: set,
replace_with: str) -> pd.Series:
"""Replace categories in set with "Other" and transform the remaining
categories to strings to avoid type errors later on in the pipeline.
Parameters
----------
data : pd.Series
Dataset which contains the variable to be replaced.
categories : set
Cleaned categories.
replace_with: str
String to be used as replacement for category.
Returns
-------
pd.Series
Series with replaced categories.
"""
return data.apply(
lambda x: str(x) if x in categories else replace_with)