# standard lib imports
from copy import deepcopy
from typing import List
import numbers
import logging
import math
# third party imports
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
from sklearn.base import BaseEstimator
from sklearn.exceptions import NotFittedError
log = logging.getLogger(__name__)
[docs]class KBinsDiscretizer(BaseEstimator):
"""Bin continuous data into intervals of predefined size. It provides a
way to partition continuous data into discrete values, i.e. transform
continuous data into nominal data. This can make a linear model more
expressive as it introduces nonlinearity to the model, while maintaining
the interpretability of the model afterwards.
This module is a rework of
https://github.com/scikit-learn/scikit-learn/blob/master/sklearn/preprocessing/_discretization.py,
though it is purely written in pandas instead of numpy because it is more intuitive. It also includes
some custom modifications to align it with the Python Predictions methodology. See the README of the
GitHub repository for more background information.
Attributes
----------
auto_adapt_bins : bool
Reduces the number of bins (starting from n_bins) as a function of
the number of missings.
change_endpoint_format : bool
Whether or not to change the format of the lower and upper bins into
``<= x`` and ``> y`` resp.
closed : str
Whether to close the bins (intervals) from the left or right
label_format : str
Format string to display the bin labels
e.g. ``min - max``, ``(min, max]``, ...
n_bins : int
Number of bins to produce. Raises ValueError if ``n_bins < 2``. A warning
is issued when a variable can only produce a lower number of bins than
asked for.
starting_precision : int
Initial precision for the bin edges to start from,
can also be negative. Given a list of bin edges, the class will
automatically choose the minimal precision required to have proper bins
e.g. ``[5.5555, 5.5744, ...]`` will be rounded to
``[5.56, 5.57, ...]``. In case of a negative number, an attempt will be
made to round up the numbers of the bin edges e.g. ``5.55 -> 10``,
``146 -> 100``, ...
strategy : str
Binning strategy. Currently only `uniform` and `quantile`
e.g. equifrequency is supported.
"""
valid_strategies = ("uniform", "quantile")
valid_keys = ["n_bins", "strategy", "closed", "auto_adapt_bins",
"starting_precision", "label_format",
"change_endpoint_format"]
def __init__(self, n_bins: int = 10, strategy: str = "quantile",
closed: str = "right",
auto_adapt_bins: bool = False,
starting_precision: int = 0,
label_format: str = "{} - {}",
change_endpoint_format: bool = False):
# validate number of bins
self._validate_n_bins(n_bins)
self.n_bins = n_bins
self.strategy = strategy.lower()
self.closed = closed.lower()
self.auto_adapt_bins = auto_adapt_bins
self.starting_precision = starting_precision
self.label_format = label_format
self.change_endpoint_format = change_endpoint_format
# dict to store fitted output in
self._bins_by_column = {}
def _validate_n_bins(self, n_bins: int):
"""Check if ``n_bins`` is of the proper type and if it is bigger
than two
Parameters
----------
n_bins : int
Number of bins KBinsDiscretizer has to produce for each variable
Raises
------
ValueError
in case ``n_bins`` is not an integer or if ``n_bins < 2``
"""
if not isinstance(n_bins, numbers.Integral):
raise ValueError("{} received an invalid n_bins type. "
"Received {}, expected int."
.format(KBinsDiscretizer.__name__,
type(n_bins).__name__))
if n_bins < 2:
raise ValueError("{} received an invalid number "
"of bins. Received {}, expected at least 2."
.format(KBinsDiscretizer.__name__, n_bins))
[docs] def attributes_to_dict(self) -> dict:
"""Return the attributes of KBinsDiscretizer in a dictionary
Returns
-------
dict
Contains the attributes of KBinsDiscretizer instance with the names
as keys
"""
params = self.get_params()
params["_bins_by_column"] = {
key: [list(tup) for tup in value] if value else None
for key, value in self._bins_by_column.items()
}
return params
[docs] def set_attributes_from_dict(self, params: dict):
"""Set instance attributes from a dictionary of values with key the
name of the attribute.
Parameters
----------
params : dict
Contains the attributes of KBinsDiscretizer with their
names as key.
Raises
------
ValueError
In case `_bins_by_column` is not of type dict
"""
_bins_by_column = params.pop("_bins_by_column", {})
if type(_bins_by_column) != dict:
raise ValueError("_bins_by_column is expected to be a dict "
"but is of type {} instead"
.format(type(_bins_by_column)))
# Clean out params dictionary to remove unknown keys (for safety!)
params = {key: params[key] for key in params if key in self.valid_keys}
# We cannot turn this method into a classmethod as we want to make use
# of the following method from BaseEstimator:
self.set_params(**params)
self._bins_by_column = {
key: ([tuple(v) for v in value] if value else None)
for key, value in _bins_by_column.items()
}
return self
[docs] def fit(self, data: pd.DataFrame, column_names: list):
"""Fits the estimator
Parameters
----------
data : pd.DataFrame
Data to be discretized
column_names : list
Names of the columns of the DataFrame to discretize
"""
if self.strategy not in self.valid_strategies:
raise ValueError("{}: valid options for 'strategy' are {}. "
"Got strategy={!r} instead."
.format(KBinsDiscretizer.__name__,
self.valid_strategies, self.strategy))
for column_name in tqdm(column_names, desc="Computing "
"discretization bins..."):
if column_name not in data.columns:
log.warning("DataFrame has no column '{}', so it will be "
"skipped in fitting" .format(column_name))
continue
bins = self._fit_column(data, column_name)
# Add to bins_by_column for later use
self._bins_by_column[column_name] = bins
def _fit_column(self, data: pd.DataFrame,
column_name: str) -> List[tuple]:
"""Compute bins for a specific column in data
Parameters
----------
data : pd.DataFrame
Data to be discretized
column_name : str
Name of the column of the DataFrame to discretize
Returns
-------
List[tuple]
list of bins as tuples
"""
col_min, col_max = data[column_name].min(), data[column_name].max()
if col_min == col_max:
log.warning("Predictor '{}' is constant and "
"will be ignored in computation".format(column_name))
return None
prop_inf = (np.sum(np.isinf(data[column_name]))
/ data[column_name].shape[0])
if prop_inf > 0:
log.warning(f"Column {column_name} has "
f"{prop_inf:.1%} inf values, thus it was skipped. "
f"Consider dropping or transforming it.")
return None
prop_nan = data[column_name].isna().sum() / data[column_name].shape[0]
if prop_nan >= 0.99:
log.warning(f"Column {column_name} is"
f" {prop_nan:.1%}% NaNs, "
f"consider dropping or transforming it.")
n_bins = self.n_bins
if self.auto_adapt_bins:
size = len(data.index)
missing_pct = data[column_name].isnull().sum()/size
n_bins = int(max(round((1 - missing_pct) * n_bins), 2))
bin_edges = self._compute_bin_edges(data, column_name, n_bins,
col_min, col_max)
if len(bin_edges) < 3:
log.warning("Only 1 bin was found for predictor '{}' so it will "
"be ignored in computation".format(column_name))
return None
if len(bin_edges) < n_bins + 1:
log.warning("The number of actual bins for predictor '{}' is {} "
"which is smaller than the requested number of bins "
"{}".format(column_name, len(bin_edges) - 1, n_bins))
return self._compute_bins_from_edges(bin_edges)
def _transform_column(self, data: pd.DataFrame,
column_name: str,
bins: List[tuple]) -> pd.DataFrame:
"""Given a DataFrame, a column name and a list of bins,
create an additional column which determines the bin in which the value
of column_name lies in.
Parameters
----------
data : pd.DataFrame
Original data to be discretized
column_name : str
name of the column to discretize
bins : List[tuple]
bins to discretize the data into
Returns
-------
pd.DataFrame
original DataFrame with an added binned column
"""
interval_idx = KBinsDiscretizer._create_index(bins, self.closed)
column_name_bin = column_name + "_bin"
# use pd.cut to compute bins
data.loc[:, column_name_bin] = pd.cut(x=data[column_name],
bins=interval_idx)
# Rename bins so that the output has a proper format
bin_labels = self._create_bin_labels(bins)
data.loc[:, column_name_bin] = (data[column_name_bin]
.cat.rename_categories(bin_labels))
if data[column_name_bin].isnull().sum() > 0:
# Add an additional bin for missing values
data[column_name_bin].cat.add_categories(["Missing"], inplace=True)
# Replace NULL with "Missing"
# Otherwise these will be ignored in groupby
data[column_name_bin].fillna("Missing", inplace=True)
return data
def _compute_bin_edges(self, data: pd.DataFrame, column_name: str,
n_bins: int, col_min: float,
col_max: float) -> list:
"""Compute the bin edges for a given column, a DataFrame and the number
of required bins
Parameters
----------
data : pd.DataFrame
Data to be discretized.
column_name : str
Name of the column to discretize.
n_bins : int
Number of bins to produce.
col_min : float
Min value of the variable.
col_max : float
Max value of the variable.
Returns
-------
list
list of bin edges from which to compute the bins
"""
bin_edges = []
if self.strategy == "quantile":
bin_edges = list(data[column_name]
.quantile(np.linspace(0, 1, n_bins + 1),
interpolation="linear"))
elif self.strategy == "uniform":
bin_edges = list(np.linspace(col_min, col_max, n_bins + 1))
# nans lead to unexpected behavior during sorting,
# by replacing with inf we ensure these stay at the
# outermost edges
if math.isnan(bin_edges[0]):
bin_edges[0] = -np.inf
if math.isnan(bin_edges[-1]):
bin_edges[-1] = np.inf
if np.isnan(bin_edges).sum() > 0:
log.warning(f"Column {column_name} "
"has NaNs present in bin definitions")
# Make absolutely sure bin edges are ordered,
# in very rare situations this wasn't the case
# due to rounding in quantile calculation (e.g.
# distributions with strong mass for same value)
bin_edges = sorted(bin_edges)
# Make sure the bin_edges are unique
# and order remains the same
return list(dict.fromkeys(bin_edges))
def _compute_minimal_precision_of_bin_edges(self, bin_edges: list) -> int:
"""Compute the minimal precision of a list of bin_edges so that we end
up with a strictly ascending sequence of different numbers even when rounded.
The starting_precision attribute will be used as the initial precision.
In case of a negative starting_precision, the bin edges will be rounded
to the nearest 10, 100, ... (e.g. 5.55 -> 10, 246 -> 200, ...)
Parameters
----------
bin_edges : list
The bin edges for binning a continuous variable
Returns
-------
int
minimal precision for the bin edges
"""
precision = self.starting_precision
while True:
cont = False
for a, b in zip(bin_edges, bin_edges[1:]):
if a != b and round(a, precision) == round(b, precision):
# precision is not high enough, so increase
precision += 1
cont = True # set cont to True to keep looping
break # break out of the for loop
if not cont:
# if minimal precision was found,
# return to break out of while loop
return precision
def _compute_bins_from_edges(self, bin_edges: list) -> List[tuple]:
"""Given a list of bin edges, compute the minimal precision for which
we can make meaningful bins and make those bins
Parameters
----------
bin_edges : list
The bin edges for binning a continuous variable
Returns
-------
List[tuple]
A (sorted) list of bins as tuples
"""
# compute the minimal precision of the bin_edges
# this can be a negative number, which then
# rounds numbers to the nearest 10, 100, ...
precision = self._compute_minimal_precision_of_bin_edges(bin_edges)
bins = []
for a, b in zip(bin_edges, bin_edges[1:]):
fmt_a = round(a, precision)
fmt_b = round(b, precision)
bins.append((fmt_a, fmt_b))
return bins
@staticmethod
def _create_index(intervals: List[tuple],
closed: str = "right") -> pd.IntervalIndex:
"""Create an pd.IntervalIndex based on a list of tuples.
This is basically a wrapper around pd.IntervalIndex.from_tuples
However, the lower bound of the first entry in the list (the lower bin)
is replaced by -np.inf. Similarly, the upper bound of the last entry in
the list (upper bin) is replaced by np.inf.
Parameters
----------
intervals : List[tuple]
A list of tuples describing the intervals.
closed : str, optional
Whether the intervals should be closed on the left-side,
right-side, both or neither.
Returns
-------
pd.IntervalIndex
Description
"""
# check if closed is of the proper form
if closed not in ["left", "right"]:
raise ValueError("{}: valid options for 'closed' are {}. "
"Got strategy={!r} instead."
.format(KBinsDiscretizer.__name__,
["left", "right"], closed))
# deepcopy variable because we do not want to modify the content
# of intervals (which is still used outside of this function)
_intervals = deepcopy(intervals)
# Replace min and max with -np.inf and np.inf resp. so that these
# values are guaranteed to be included when transforming the data
_intervals[0] = (-np.inf, _intervals[0][1])
_intervals[-1] = (_intervals[-1][0], np.inf)
return pd.IntervalIndex.from_tuples(_intervals, closed)
def _create_bin_labels(self, bins: List[tuple]) -> list:
"""Given a list of bins, create a list of string containing the bins
as a string with a specific format (e.g. bin labels)
Parameters
----------
bins : List[tuple]
list of tuple containing for each bin the upper and lower bound
Returns
-------
list
list of (formatted) bin labels
"""
bin_labels = []
for interval in bins:
bin_labels.append(self.label_format.format(interval[0],
interval[1]))
# Format first and last bin as < x and > y resp.
if self.change_endpoint_format:
if self.closed == "left":
bin_labels[0] = "< {}".format(bins[0][1])
bin_labels[-1] = ">= {}".format(bins[-1][0])
else:
bin_labels[0] = "<= {}".format(bins[0][1])
bin_labels[-1] = "> {}".format(bins[-1][0])
return bin_labels