import warnings
from typing import Callable, Dict, Tuple, Union
import numpy as np
import pandas as pd
from scipy import optimize, special, stats
from risk_distributions.formatting import (
Parameter,
Parameters,
cast_to_series,
format_call_data,
format_data,
)
[docs]
class BaseDistribution:
"""Generic vectorized wrapper around scipy distributions."""
distribution = None
expected_parameters = ()
def __init__(
self, parameters: Parameters = None, mean: Parameter = None, sd: Parameter = None
):
self.parameters = self.get_parameters(parameters, mean, sd)
[docs]
@classmethod
def get_parameters(
cls, parameters: Parameters = None, mean: Parameter = None, sd: Parameter = None
) -> pd.DataFrame:
required_parameters = list(cls.expected_parameters + ("x_min", "x_max"))
if parameters is not None:
if not (mean is None and sd is None):
raise ValueError(
"You may supply either pre-calculated parameters or"
" mean and standard deviation but not both."
)
parameters = format_data(parameters, required_parameters, "parameters")
else:
if mean is None or sd is None:
raise ValueError(
"You may supply either pre-calculated parameters or"
" mean and standard deviation but not both."
)
mean, sd = cast_to_series(mean, sd)
parameters = pd.DataFrame(0.0, columns=required_parameters, index=mean.index)
computable = cls.computable_parameter_index(mean, sd)
parameters.loc[computable, ["x_min", "x_max"]] = cls.compute_min_max(
mean.loc[computable], sd.loc[computable]
)
# The scipy.stats distributions run optimization routines that handle FloatingPointErrors,
# transforming them into RuntimeWarnings. This gets noisy in our logs.
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
parameters.loc[
computable, list(cls.expected_parameters)
] = cls._get_parameters(
mean.loc[computable],
sd.loc[computable],
parameters.loc[computable, "x_min"],
parameters.loc[computable, "x_max"],
)
return parameters
[docs]
@staticmethod
def computable_parameter_index(mean: pd.Series, sd: pd.Series) -> pd.Index:
return mean[(mean != 0) & ~np.isnan(mean) & (sd != 0) & ~np.isnan(sd)].index
[docs]
@staticmethod
def compute_min_max(mean: pd.Series, sd: pd.Series) -> pd.DataFrame:
"""Gets the upper and lower bounds of the distribution support."""
# noinspection PyTypeChecker
alpha = 1 + sd**2 / mean**2
scale = mean / np.sqrt(alpha)
s = np.sqrt(np.log(alpha))
x_min = stats.lognorm(s=s, scale=scale).ppf(0.001)
x_max = stats.lognorm(s=s, scale=scale).ppf(0.999)
return pd.DataFrame({"x_min": x_min, "x_max": x_max}, index=mean.index)
@staticmethod
def _get_parameters(
mean: pd.Series, sd: pd.Series, x_min: pd.Series, x_max: pd.Series
) -> pd.DataFrame:
raise NotImplementedError()
[docs]
def process(
self, data: pd.Series, parameters: pd.DataFrame, process_type: str
) -> pd.Series:
"""Function called before and after distribution looks to handle pre- and post-processing.
This function should look like an if sieve on the `process_type` and fall back with a call to
this method if no processing needs to be done.
Parameters
----------
data
The data to be processed.
parameters
Parameter data to be used in the processing.
process_type
One of `pdf_preprocess`, `pdf_postprocess`, `ppf_preprocess`, `ppf_post_process`.
Returns
-------
pandas.Series
The processed data.
"""
return data
[docs]
def pdf(
self, x: Union[pd.Series, np.ndarray, float, int]
) -> Union[pd.Series, np.ndarray, float]:
single_val = isinstance(x, (float, int))
values_only = isinstance(x, np.ndarray)
x, parameters = format_call_data(x, self.parameters)
computable = parameters[
(parameters.sum(axis=1) != 0)
& ~np.isnan(x)
& (parameters["x_min"] <= x)
& (x <= parameters["x_max"])
].index
x.loc[computable] = self.process(
x.loc[computable], parameters.loc[computable], "pdf_preprocess"
)
p = pd.Series(np.nan, x.index)
if not computable.empty:
params = parameters.loc[computable, list(self.expected_parameters)]
p.loc[computable] = self.distribution(**params.to_dict("series")).pdf(
x.loc[computable]
)
p.loc[computable] = self.process(
p.loc[computable], parameters.loc[computable], "pdf_postprocess"
)
if single_val:
p = p.iloc[0]
if values_only:
p = p.values
return p
[docs]
def ppf(
self, q: Union[pd.Series, np.ndarray, float, int]
) -> Union[pd.Series, np.ndarray, float]:
single_val = isinstance(q, (float, int))
values_only = isinstance(q, np.ndarray)
q, parameters = format_call_data(q, self.parameters)
computable = parameters[
(parameters.sum(axis=1) != 0)
& ~np.isnan(q)
& (0.001 <= q.values)
& (q.values <= 0.999)
].index
q.loc[computable] = self.process(
q.loc[computable], parameters.loc[computable], "ppf_preprocess"
)
x = pd.Series(np.nan, q.index)
if not computable.empty:
params = parameters.loc[computable, list(self.expected_parameters)]
x.loc[computable] = self.distribution(**params.to_dict("series")).ppf(
q.loc[computable]
)
x.loc[computable] = self.process(
x.loc[computable], parameters.loc[computable], "ppf_postprocess"
)
if single_val:
x = x.iloc[0]
if values_only:
x = x.values
return x
[docs]
def cdf(
self, x: Union[pd.Series, np.ndarray, float, int]
) -> Union[pd.Series, np.ndarray, float]:
single_val = isinstance(x, (float, int))
values_only = isinstance(x, np.ndarray)
x, parameters = format_call_data(x, self.parameters)
computable = parameters[
(parameters.sum(axis=1) != 0)
& ~np.isnan(x)
& (parameters["x_min"] <= x)
& (x <= parameters["x_max"])
].index
x.loc[computable] = self.process(
x.loc[computable], parameters.loc[computable], "cdf_preprocess"
)
c = pd.Series(np.nan, x.index)
if not computable.empty:
params = parameters.loc[computable, list(self.expected_parameters)]
c.loc[computable] = self.distribution(**params.to_dict("series")).cdf(
x.loc[computable]
)
c.loc[computable] = self.process(
c.loc[computable], parameters.loc[computable], "cdf_postprocess"
)
if single_val:
c = c.iloc[0]
if values_only:
c = c.values
return c
[docs]
class Beta(BaseDistribution):
distribution = stats.beta
expected_parameters = ("a", "b", "scale", "loc")
@staticmethod
def _get_parameters(
mean: pd.Series, sd: pd.Series, x_min: pd.Series, x_max: pd.Series
) -> pd.DataFrame:
scale = x_max - x_min
a = 1 / scale * (mean - x_min)
# noinspection PyTypeChecker
b = (1 / scale * sd) ** 2
params = pd.DataFrame(
{
"a": a**2 / b * (1 - a) - a,
"b": a / b * (1 - a) ** 2 + (a - 1),
"scale": scale,
"loc": x_min,
},
index=mean.index,
)
return params
[docs]
class Exponential(BaseDistribution):
distribution = stats.expon
expected_parameters = ("scale",)
@staticmethod
def _get_parameters(
mean: pd.Series, sd: pd.Series, x_min: pd.Series, x_max: pd.Series
) -> pd.DataFrame:
return pd.DataFrame({"scale": mean}, index=mean.index)
[docs]
class Gamma(BaseDistribution):
distribution = stats.gamma
expected_parameters = ("a", "scale")
@staticmethod
def _get_parameters(
mean: pd.Series, sd: pd.Series, x_min: pd.Series, x_max: pd.Series
) -> pd.DataFrame:
# noinspection PyTypeChecker
params = pd.DataFrame(
{
"a": (mean / sd) ** 2,
"scale": sd**2 / mean,
},
index=mean.index,
)
return params
[docs]
class Gumbel(BaseDistribution):
distribution = stats.gumbel_r
expected_parameters = ("loc", "scale")
@staticmethod
def _get_parameters(
mean: pd.Series, sd: pd.Series, x_min: pd.Series, x_max: pd.Series
) -> pd.DataFrame:
params = pd.DataFrame(
{
"loc": mean - (np.euler_gamma * np.sqrt(6) / np.pi * sd),
"scale": np.sqrt(6) / np.pi * sd,
},
index=mean.index,
)
return params
[docs]
class InverseGamma(BaseDistribution):
distribution = stats.invgamma
expected_parameters = ("a", "scale")
@staticmethod
def _get_parameters(
mean: pd.Series, sd: pd.Series, x_min: pd.Series, x_max: pd.Series
) -> pd.DataFrame:
def target_function(guess, m, s):
alpha, beta = np.abs(guess)
mean_guess = beta / (alpha - 1)
var_guess = beta**2 / ((alpha - 1) ** 2 * (alpha - 2))
return (m - mean_guess) ** 2 + (s**2 - var_guess) ** 2
opt_results = _get_optimization_result(
mean, sd, target_function, lambda m, s: np.array((m, m * s))
)
result_indices = range(len(mean))
if not np.all([opt_results[k].success for k in result_indices]):
raise NonConvergenceError("InverseGamma did not converge!!", "invgamma")
params = pd.DataFrame(
{
"a": np.abs([opt_results[k].x[0] for k in result_indices]),
"scale": np.abs([opt_results[k].x[1] for k in result_indices]),
},
index=mean.index,
)
return params
[docs]
class InverseWeibull(BaseDistribution):
distribution = stats.invweibull
expected_parameters = ("c", "scale")
@staticmethod
def _get_parameters(
mean: pd.Series, sd: pd.Series, x_min: pd.Series, x_max: pd.Series
) -> pd.DataFrame:
# moments from Stat Papers (2011) 52: 591. https://doi.org/10.1007/s00362-009-0271-3
# it is much faster than using stats.invweibull.mean/var
def target_function(guess, m, s):
shape, scale = np.abs(guess)
mean_guess = scale * special.gamma(1 - 1 / shape)
var_guess = scale**2 * special.gamma(1 - 2 / shape) - mean_guess**2
return (m - mean_guess) ** 2 + (s**2 - var_guess) ** 2
opt_results = _get_optimization_result(
mean, sd, target_function, lambda m, s: np.array((max(2.2, s / m), m))
)
result_indices = range(len(mean))
if not np.all([opt_results[k].success for k in result_indices]):
raise NonConvergenceError("InverseWeibull did not converge!!", "invweibull")
params = pd.DataFrame(
{
"c": np.abs([opt_results[k].x[0] for k in result_indices]),
"scale": np.abs([opt_results[k].x[1] for k in result_indices]),
},
index=mean.index,
)
return params
[docs]
class LogLogistic(BaseDistribution):
distribution = stats.burr12
expected_parameters = ("c", "d", "scale")
@staticmethod
def _get_parameters(
mean: pd.Series, sd: pd.Series, x_min: pd.Series, x_max: pd.Series
) -> pd.DataFrame:
def target_function(guess, m, s):
shape, scale = np.abs(guess)
b = np.pi / shape
mean_guess = scale * b / np.sin(b)
var_guess = scale**2 * 2 * b / np.sin(2 * b) - mean_guess**2
return (m - mean_guess) ** 2 + (s**2 - var_guess) ** 2
opt_results = _get_optimization_result(
mean, sd, target_function, lambda m, s: np.array((max(2, m), m))
)
result_indices = range(len(mean))
if not np.all([opt_results[k].success for k in result_indices]):
raise NonConvergenceError("LogLogistic did not converge!!", "llogis")
params = pd.DataFrame(
{
"c": np.abs([opt_results[k].x[0] for k in result_indices]),
"d": 1,
"scale": np.abs([opt_results[k].x[1] for k in result_indices]),
},
index=mean.index,
)
return params
[docs]
class LogNormal(BaseDistribution):
distribution = stats.lognorm
expected_parameters = ("s", "scale")
@staticmethod
def _get_parameters(
mean: pd.Series, sd: pd.Series, x_min: pd.Series, x_max: pd.Series
) -> pd.DataFrame:
# noinspection PyTypeChecker
alpha = 1 + sd**2 / mean**2
params = pd.DataFrame(
{
"s": np.sqrt(np.log(alpha)),
"scale": mean / np.sqrt(alpha),
},
index=mean.index,
)
return params
[docs]
class MirroredGumbel(BaseDistribution):
distribution = stats.gumbel_r
expected_parameters = ("loc", "scale")
@staticmethod
def _get_parameters(
mean: pd.Series, sd: pd.Series, x_min: pd.Series, x_max: pd.Series
) -> pd.DataFrame:
params = pd.DataFrame(
{
"loc": x_max - mean - (np.euler_gamma * np.sqrt(6) / np.pi * sd),
"scale": np.sqrt(6) / np.pi * sd,
},
index=mean.index,
)
return params
[docs]
def process(
self, data: pd.Series, parameters: pd.DataFrame, process_type: str
) -> pd.Series:
x_min, x_max = (
parameters.loc[data.index, "x_min"],
parameters.loc[data.index, "x_max"],
)
if process_type in ["pdf_preprocess", "cdf_preprocess"]:
value = x_max - data
elif process_type == "ppf_preprocess":
# noinspection PyTypeChecker
value = 1 - data
elif process_type == "ppf_postprocess":
value = x_max - data
else:
value = super().process(data, parameters, process_type)
return value
[docs]
class MirroredGamma(BaseDistribution):
distribution = stats.gamma
expected_parameters = ("a", "scale")
@staticmethod
def _get_parameters(
mean: pd.Series, sd: pd.Series, x_min: pd.Series, x_max: pd.Series
) -> pd.DataFrame:
# noinspection PyTypeChecker
params = pd.DataFrame(
{"a": ((x_max - mean) / sd) ** 2, "scale": sd**2 / (x_max - mean)},
index=mean.index,
)
return params
[docs]
def process(
self, data: pd.Series, parameters: pd.DataFrame, process_type: str
) -> pd.Series:
x_min, x_max = (
parameters.loc[data.index, "x_min"],
parameters.loc[data.index, "x_max"],
)
if process_type in ["pdf_preprocess", "cdf_preprocess"]:
value = x_max - data
elif process_type == "ppf_preprocess":
# noinspection PyTypeChecker
value = 1 - data
elif process_type == "ppf_postprocess":
value = x_max - data
else:
value = super().process(data, parameters, process_type)
return value
[docs]
class Normal(BaseDistribution):
distribution = stats.norm
expected_parameters = ("loc", "scale")
@staticmethod
def _get_parameters(
mean: pd.Series, sd: pd.Series, x_min: pd.Series, x_max: pd.Series
) -> pd.DataFrame:
params = pd.DataFrame(
{
"loc": mean,
"scale": sd,
},
mean.index,
)
return params
[docs]
class Weibull(BaseDistribution):
distribution = stats.weibull_min
expected_parameters = ("c", "scale")
@staticmethod
def _get_parameters(
mean: pd.Series, sd: pd.Series, x_min: pd.Series, x_max: pd.Series
) -> pd.DataFrame:
def target_function(guess, m, s):
shape, scale = np.abs(guess)
mean_guess = scale * special.gamma(1 + 1 / shape)
var_guess = scale**2 * special.gamma(1 + 2 / shape) - mean_guess**2
return (m - mean_guess) ** 2 + (s**2 - var_guess) ** 2
opt_results = _get_optimization_result(
mean, sd, target_function, lambda m, s: np.array((m, m / s))
)
result_indices = range(len(mean))
if not np.all([opt_results[k].success is True for k in result_indices]):
raise NonConvergenceError("Weibull did not converge!!", "weibull")
params = pd.DataFrame(
{
"c": np.abs([opt_results[k].x[0] for k in result_indices]),
"scale": np.abs([opt_results[k].x[1] for k in result_indices]),
},
index=mean.index,
)
return params
[docs]
class EnsembleDistribution:
"""Represents an arbitrary distribution as a weighted sum of several concrete distribution types."""
_distribution_map = {
"betasr": Beta,
"exp": Exponential,
"gamma": Gamma,
"gumbel": Gumbel,
"invgamma": InverseGamma,
"invweibull": InverseWeibull,
"llogis": LogLogistic,
"lnorm": LogNormal,
"mgamma": MirroredGamma,
"mgumbel": MirroredGumbel,
"norm": Normal,
"weibull": Weibull,
}
def __init__(
self,
weights: Parameters,
parameters: Dict[str, Parameters] = None,
mean: Parameter = None,
sd: Parameter = None,
):
self.weights, self.parameters = self.get_parameters(weights, parameters, mean, sd)
[docs]
@classmethod
def get_parameters(
cls,
weights: Parameters,
parameters: Parameters = None,
mean: Parameter = None,
sd: Parameter = None,
) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]:
weights = format_data(weights, list(cls._distribution_map.keys()), "weights")
params = {}
for name, dist in cls._distribution_map.items():
try:
param = parameters[name] if parameters else None
params[name] = dist.get_parameters(param, mean, sd)
except NonConvergenceError:
if weights[name].max() < 0.05:
weights.loc[name, :] = 0
else:
raise NonConvergenceError(
f"Divergent {name} distribution has "
f"weights: {100 * weights[name]}%",
name,
)
# Rescale weights in case we floored any of them:
non_zero_rows = weights[weights.sum(axis=1) != 0]
weights.loc[non_zero_rows.index] = non_zero_rows.divide(
non_zero_rows.sum(axis=1), axis=0
)
return weights, params
[docs]
def pdf(
self, x: Union[pd.Series, np.ndarray, float, int]
) -> Union[pd.Series, np.ndarray, float]:
single_val = isinstance(x, (float, int))
values_only = isinstance(x, np.ndarray)
x, weights = format_call_data(x, self.weights)
computable = weights[(weights.sum(axis=1) != 0) & ~np.isnan(x)].index
p = pd.Series(np.nan, index=x.index)
if not computable.empty:
p.loc[computable] = 0
for name, parameters in self.parameters.items():
w = weights.loc[computable, name]
params = parameters.loc[computable] if len(parameters) > 1 else parameters
p += w * self._distribution_map[name](parameters=params).pdf(
x.loc[computable]
)
if single_val:
p = p.iloc[0]
if values_only:
p = p.values
return p
[docs]
def ppf(
self,
q: Union[pd.Series, np.ndarray, float, int],
q_dist: Union[pd.Series, np.ndarray, float, int],
) -> Union[pd.Series, np.ndarray, float]:
"""Quantile function using 2 propensities.
Parameters
---------
q :
value propensity
q_dist :
propensity for picking the distribution
Returns
--------
Union[pandas.Series, numpy.ndarray, float]
Sample from the ensembled distribution.
"""
single_val = isinstance(q, (float, int))
values_only = isinstance(q, np.ndarray)
q, weights = format_call_data(q, self.weights)
q_dist, _ = format_call_data(q_dist, self.weights)
computable = weights[(weights.sum(axis=1) != 0) & ~np.isnan(q)].index
x = pd.Series(np.nan, index=q.index)
if not computable.empty:
p_bins = np.cumsum(weights.loc[computable], axis=1)
choice_index = (q_dist.loc[computable].values[np.newaxis].T > p_bins).sum(axis=1)
x.loc[computable] = 0
idx_lookup = {v: i for i, v in enumerate(weights.columns)}
for name, parameters in self.parameters.items():
idx = choice_index[choice_index == idx_lookup[name]].index
if len(idx):
params = (
parameters.loc[computable.intersection(idx)]
if len(parameters) > 1
else parameters
)
x[idx] = self._distribution_map[name](parameters=params).ppf(q[idx])
if single_val:
x = x.iloc[0]
if values_only:
x = x.values
return x
[docs]
def cdf(
self, x: Union[pd.Series, np.ndarray, float, int]
) -> Union[pd.Series, np.ndarray, float]:
single_val = isinstance(x, (float, int))
values_only = isinstance(x, np.ndarray)
x, weights = format_call_data(x, self.weights)
computable = weights[(weights.sum(axis=1) != 0) & ~np.isnan(x)].index
c = pd.Series(np.nan, index=x.index)
c.loc[computable] = 0
if not computable.empty:
for name, parameters in self.parameters.items():
w = weights.loc[computable, name]
params = parameters.loc[computable] if len(parameters) > 1 else parameters
c += w * self._distribution_map[name](parameters=params).cdf(
x.loc[computable]
)
if single_val:
c = c.iloc[0]
if values_only:
c = c.values
return c
[docs]
class NonConvergenceError(Exception):
"""Raised when the optimization fails to converge"""
def __init__(self, message: str, dist: str) -> None:
super().__init__(message)
self.dist = dist
def _get_optimization_result(
mean: pd.Series, sd: pd.Series, func: Callable, initial_func: Callable
) -> Tuple:
"""Finds the shape parameters of distributions which generates mean/sd close to actual mean/sd.
Parameters
---------
mean :
Series where each row has a mean for a single distribution, matches with sd.
sd :
Series where each row has a standard deviation for a single distribution, matches with mean.
func:
The optimization objective function. Takes arguments `initial guess`, `mean`, and `standard_deviation`.
initial_func:
Function to produce initial guess from a `mean` and `standard_deviation`.
Returns
--------
A tuple of the optimization results.
"""
mean, sd = mean.values, sd.values
results = []
with np.errstate(all="warn"):
for i in range(len(mean)):
initial_guess = initial_func(mean[i], sd[i])
result = optimize.minimize(
func,
initial_guess,
(
mean[i],
sd[i],
),
method="Nelder-Mead",
options={"maxiter": 10000},
)
results.append(result)
return tuple(results)