Spaces:
Running
Running
import inspect | |
import math | |
import warnings | |
from abc import ABC, abstractmethod | |
import torch | |
from torch import Tensor | |
class AbstractMonteCarloRiskEstimator(ABC): | |
"""Abstract class for Monte Carlo estimation of risk objectives""" | |
def __call__(self, risk_level: Tensor, cost: Tensor) -> Tensor: | |
"""Computes and returns the risk objective estimated on the cost tensor | |
Args: | |
risk_level: (batch_size,) tensor of risk-level at which the risk objective is computed | |
cost: (batch_size, n_samples) tensor of cost samples | |
Returns: | |
risk tensor of size (batch_size,) | |
""" | |
class EntropicRiskEstimator(AbstractMonteCarloRiskEstimator): | |
"""Monte Carlo estimator for the entropic risk objective. | |
This estimator computes the entropic risk as 1/risk_level * log( mean( exp(risk_level * cost), 1)) | |
However, this is unstable. | |
When risk_level is large, the logsumexp trick is used. | |
When risk_level is small, it computes entropic_risk for small values of risk_level as the second order Taylor expansion instead. | |
Args: | |
eps: Risk-level threshold to switch between logsumexp and Taylor expansion. Defaults to 1e-4. | |
""" | |
def __init__(self, eps: float = 1e-4) -> None: | |
self.eps = eps | |
def __call__(self, risk_level: Tensor, cost: Tensor, weights: Tensor) -> Tensor: | |
"""Computes and returns the entropic risk estimated on the cost tensor | |
Args: | |
risk_level: (batch_size, n_agents,) tensor of risk-level at which the risk objective is computed | |
cost: (batch_size, n_agents, n_samples) cost tensor | |
weights: (batch_size, n_agents, n_samples) tensor of weights for the cost samples | |
Returns: | |
entropic risk tensor of size (batch_size,) | |
""" | |
weights = weights / weights.sum(dim=-1, keepdim=True) | |
batch_size, n_agents, n_samples = cost.shape | |
entropic_risk_cost_large_sigma = ( | |
((risk_level.view(batch_size, n_agents, 1) * cost).exp() * weights) | |
.sum(-1) | |
.log() | |
) / risk_level | |
mean = (cost * weights).sum(dim=-1) | |
var = (cost**2 * weights).sum(dim=-1) - mean**2 | |
var, mean = torch.var_mean(cost, -1) | |
entropic_risk_cost_small_sigma = mean + 0.5 * risk_level * var | |
return torch.where( | |
torch.abs(risk_level) > self.eps, | |
entropic_risk_cost_large_sigma, | |
entropic_risk_cost_small_sigma, | |
) | |
class CVaREstimator(AbstractMonteCarloRiskEstimator): | |
"""Monte Carlo estimator for the conditional value-at-risk objective. | |
This estimator is proposed in the following references, and shown to be consistent. | |
- Hong et al. (2014), "Monte Carlo Methods for Value-at-Risk and Conditional Value-at-Risk: A Review" | |
- Traindade et al. (2007), "Financial prediction with constrained tail risk" | |
When risk_level is larger than 1 - eps, it falls back to the max operator | |
Args: | |
Args: | |
eps: Risk-level threshold to switch between CVaR and Max. Defaults to 1e-4. | |
""" | |
def __init__(self, eps: float = 1e-4) -> None: | |
self.eps = eps | |
def __call__(self, risk_level: Tensor, cost: Tensor, weights: Tensor) -> Tensor: | |
"""Computes and returns the conditional value-at-risk estimated on the cost tensor | |
Args: | |
risk_level: (batch_size, n_agents) tensor of risk-level in [0, 1] at which the CVaR risk is computed | |
cost: (batch_size, n_agents, n_samples) cost tensor | |
weights: (batch_size, n_agents, n_samples) tensor of weights for the cost samples | |
Returns: | |
conditional value-at-risk tensor of size (batch_size, n_agents) | |
""" | |
assert risk_level.shape[0] == cost.shape[0] | |
assert risk_level.shape[1] == cost.shape[1] | |
if weights is None: | |
weights = torch.ones_like(cost) / cost.shape[-1] | |
else: | |
weights = weights / weights.sum(dim=-1, keepdim=True) | |
if not (torch.all(0.0 <= risk_level) and torch.all(risk_level <= 1.0)): | |
warnings.warn( | |
"risk_level is defined only between 0.0 and 1.0 for CVaR. Exceeded values will be clamped." | |
) | |
risk_level = torch.clamp(risk_level, min=0.0, max=1.0) | |
cvar_risk_high = cost.max(dim=-1).values | |
sorted_indices = torch.argsort(cost, dim=-1) | |
# cost_sorted = cost.sort(dim=-1, descending=False).values | |
cost_sorted = torch.gather(cost, -1, sorted_indices) | |
weights_sorted = torch.gather(weights, -1, sorted_indices) | |
idx_to_choose = torch.argmax( | |
(weights_sorted.cumsum(dim=-1) >= risk_level.unsqueeze(-1)).float(), -1 | |
) | |
value_at_risk_mc = cost_sorted.gather(-1, idx_to_choose.unsqueeze(-1)).squeeze( | |
-1 | |
) | |
# weights_at_risk_mc = 1 - weights_sorted.cumsum(-1).gather( | |
# -1, idx_to_choose.unsqueeze(-1) | |
# ).squeeze(-1) | |
# cvar_risk_mc = value_at_risk_mc + ( | |
# (torch.relu(cost - value_at_risk_mc.unsqueeze(-1)) * weights).sum(dim=-1) | |
# / weights_at_risk_mc | |
# ) | |
# cvar = torch.where(weights_at_risk_mc < self.eps, cvar_risk_high, cvar_risk_mc) | |
cvar_risk_mc = value_at_risk_mc + 1 / (1 - risk_level) * ( | |
(torch.relu(cost - value_at_risk_mc.unsqueeze(-1)) * weights).sum(dim=-1) | |
) | |
cvar = torch.where(risk_level > 1 - self.eps, cvar_risk_high, cvar_risk_mc) | |
return cvar | |
def get_risk_estimator(estimator_params: dict) -> AbstractMonteCarloRiskEstimator: | |
"""Function that returns the Monte Carlo risk estimator hat matches the given parameters. | |
Tries to give a comprehensive feedback if the parameters are not recognized and raise an error. | |
Args: | |
Risk estimator should be one of the following types (with different parameter values as desired) : | |
{"type": "entropic", "eps": 1e-4}, | |
{"type": "cvar", "eps": 1e-4} | |
Raises: | |
RuntimeError: If the given parameter dictionary does not match one of the expected formats, raise a comprehensive error. | |
Returns: | |
A risk estimator matching the given parameters. | |
""" | |
known_types = ["entropic", "cvar"] | |
try: | |
if estimator_params["type"].lower() == "entropic": | |
expected_params = inspect.getfullargspec(EntropicRiskEstimator)[0][1:] | |
return EntropicRiskEstimator(estimator_params["eps"]) | |
elif estimator_params["type"].lower() == "cvar": | |
expected_params = inspect.getfullargspec(CVaREstimator)[0][1:] | |
return CVaREstimator(estimator_params["eps"]) | |
else: | |
raise RuntimeError( | |
f"Risk estimator '{estimator_params['type']}' is unknown. It should be one of {known_types}." | |
) | |
except KeyError: | |
if "type" in estimator_params: | |
raise RuntimeError( | |
f"""The estimator '{estimator_params['type']}' is known but the given parameters | |
{estimator_params} do not match the expected parameters {expected_params}.""" | |
) | |
else: | |
raise RuntimeError( | |
f"""The given estimator parameters {estimator_params} do not define the estimator | |
type in the field 'type'. Please add a field 'type' and set it to one of the | |
handeled types: {known_types}.""" | |
) | |
class AbstractRiskLevelSampler(ABC): | |
"""Abstract class for a risk-level sampler for training and evaluating risk-biased predictors""" | |
def sample(self, batch_size: int, device: torch.device) -> Tensor: | |
"""Returns a tensor of size batch_size with sampled risk-level values | |
Args: | |
batch_size: number of elements in the out tensor | |
device: device of the output tensor | |
Returns: | |
A tensor of shape(batch_size,) filled with sampled risk values | |
""" | |
def get_highest_risk(self, batch_size: int, device: torch.device) -> Tensor: | |
"""Returns a tensor of size batch_size with high values of risk. | |
Args: | |
batch_size: number of elements in the out tensor | |
device: device of the output tensor | |
Returns: | |
A tensor of shape (batchc_size,) filled with the highest possible risk-level | |
""" | |
class UniformRiskLevelSampler(AbstractRiskLevelSampler): | |
"""Risk-level sampler with a uniform distribution | |
Args: | |
min: minimum risk-level | |
max: maximum risk-level | |
""" | |
def __init__(self, min: int, max: int) -> None: | |
self.min = min | |
self.max = max | |
def sample(self, batch_size: int, device: torch.device) -> Tensor: | |
return torch.rand(batch_size, device=device) * (self.max - self.min) + self.min | |
def get_highest_risk(self, batch_size: int, device: torch.device) -> Tensor: | |
return torch.ones(batch_size, device=device) * self.max | |
class NormalRiskLevelSampler(AbstractRiskLevelSampler): | |
"""Risk-level sampler with a normal distribution | |
Args: | |
mean: average risk-level | |
sigma: standard deviation of the sampler | |
""" | |
def __init__(self, mean: int, sigma: int) -> None: | |
self.mean = mean | |
self.sigma = sigma | |
def sample(self, batch_size: int, device: torch.device) -> Tensor: | |
return torch.randn(batch_size, device=device) * self.sigma + self.mean | |
def get_highest_risk(self, batch_size: int, device: torch.device) -> Tensor: | |
return torch.ones(batch_size, device=device) * self.sigma * 3 | |
class BernoulliRiskLevelSampler(AbstractRiskLevelSampler): | |
"""Risk-level sampler with a scaled Bernoulli distribution | |
Args: | |
min: minimum risk-level | |
max: maximum risk-level | |
p: Bernoulli parameter | |
""" | |
def __init__(self, min: int, max: int, p: int) -> None: | |
self.min = min | |
self.max = max | |
self.p = p | |
def sample(self, batch_size: int, device: torch.device) -> Tensor: | |
return ( | |
torch.bernoulli(torch.ones(batch_size, device=device) * self.p) | |
* (self.max - self.min) | |
+ self.min | |
) | |
def get_highest_risk(self, batch_size: int, device: torch.device) -> Tensor: | |
return torch.ones(batch_size, device=device) * self.max | |
class BetaRiskLevelSampler(AbstractRiskLevelSampler): | |
"""Risk-level sampler with a scaled Beta distribution | |
Distribution properties: | |
mean = alpha*(max-min)/(alpha + beta) + min | |
mode = (alpha-1)*(max-min)/(alpha + beta - 2) + min | |
variance = alpha*beta*(max-min)**2/((alpha+beta)**2*(alpha+beta+1)) | |
Args: | |
min: minimum risk-level | |
max: maximum risk-level | |
alpha: First distribution parameter | |
beta: Second distribution parameter | |
""" | |
def __init__(self, min: int, max: int, alpha: float, beta: float) -> None: | |
self.min = min | |
self.max = max | |
self._distribution = torch.distributions.Beta( | |
torch.tensor([alpha], dtype=torch.float32), | |
torch.tensor([beta], dtype=torch.float32), | |
) | |
def alpha(self): | |
return self._distribution.concentration1.item() | |
def alpha(self, alpha: float): | |
self._distribution = torch.distributions.Beta( | |
torch.tensor([alpha], dtype=torch.float32), | |
torch.tensor([self.beta], dtype=torch.float32), | |
) | |
def beta(self): | |
return self._distribution.concentration0.item() | |
def beta(self, beta: float): | |
self._distribution = torch.distributions.Beta( | |
torch.tensor([self.alpha], dtype=torch.float32), | |
torch.tensor([beta], dtype=torch.float32), | |
) | |
def sample(self, batch_size: int, device: torch.device) -> Tensor: | |
return ( | |
self._distribution.sample((batch_size,)).to(device) * (self.max - self.min) | |
+ self.min | |
).view(batch_size) | |
def get_highest_risk(self, batch_size: int, device: torch.device) -> Tensor: | |
return torch.ones(batch_size, device=device) * self.max | |
class Chi2RiskLevelSampler(AbstractRiskLevelSampler): | |
"""Risk-level sampler with a scaled chi2 distribution | |
Distribution properties: | |
mean = k*scale + min | |
mode = max(k-2, 0)*scale + min | |
variance = 2*k*scale**2 | |
Args: | |
min: minimum risk-level | |
scale: scaling factor for the risk-level | |
k: Chi2 parameter: degrees of freedom of the distribution | |
""" | |
def __init__(self, min: int, scale: float, k: int) -> None: | |
self.min = min | |
self.scale = scale | |
self._distribution = torch.distributions.Chi2( | |
torch.tensor([k], dtype=torch.float32) | |
) | |
def k(self): | |
return self._distribution.df.item() | |
def k(self, k: int): | |
self._distribution = torch.distributions.Chi2( | |
torch.tensor([k], dtype=torch.float32) | |
) | |
def sample(self, batch_size: int, device: torch.device) -> Tensor: | |
return ( | |
self._distribution.sample((batch_size,)).to(device) * self.scale + self.min | |
).view(batch_size) | |
def get_highest_risk(self, batch_size: int, device: torch.device) -> Tensor: | |
std = self.scale * math.sqrt(2 * self.k) | |
return torch.ones(batch_size, device=device) * std * 3 | |
class LogNormalRiskLevelSampler(AbstractRiskLevelSampler): | |
"""Risk-level sampler with a scaled Beta distribution | |
Distribution properties: | |
mean = exp(mu + sigma**2/2)*scale + min | |
mode = exp(mu - sigma**2)*scale + min | |
variance = (exp(sigma**2)-1)*exp(2*mu+sigma**2)*scale**2 | |
Args: | |
min: minimum risk-level | |
scale: scaling factor for the risk-level | |
mu: First distribution parameter | |
sigma: maximum risk-level | |
""" | |
def __init__(self, min: int, scale: float, mu: float, sigma: float) -> None: | |
self.min = min | |
self.scale = scale | |
self._distribution = torch.distributions.LogNormal( | |
torch.tensor([mu], dtype=torch.float32), | |
torch.tensor([sigma], dtype=torch.float32), | |
) | |
def mu(self): | |
return self._distribution.loc.item() | |
def mu(self, mu: float): | |
self._distribution = torch.distributions.LogNormal( | |
torch.tensor([mu], dtype=torch.float32), | |
torch.tensor([self.sigma], dtype=torch.float32), | |
) | |
def sigma(self) -> float: | |
return self._distribution.scale.item() | |
def sigma(self, sigma: float): | |
self._distribution = torch.distributions.LogNormal( | |
torch.tensor([self.mu], dtype=torch.float32), | |
torch.tensor([sigma], dtype=torch.float32), | |
) | |
def sample(self, batch_size: int, device: torch.device) -> Tensor: | |
return ( | |
self._distribution.sample((batch_size,)).to(device) * self.scale + self.min | |
).view(batch_size) | |
def get_highest_risk(self, batch_size: int, device: torch.device) -> Tensor: | |
std = ( | |
(torch.exp(self.sigma.square()) - 1).sqrt() | |
* torch.exp(self.mu + self.sigma.square() / 2) | |
* self.scale | |
) | |
return torch.ones(batch_size, device=device) * 3 * std | |
class LogUniformRiskLevelSampler(AbstractRiskLevelSampler): | |
"""Risk-level sampler with a reversed log-uniform distribution (increasing density function). Between min and max. | |
Distribution properties: | |
mean = (max - min)/ln((max+1)/(min+1)) - 1/scale | |
mode = None | |
variance = (((max+1)^2 - (min+1)^2)/(2*ln((max+1)/(min+1))) - ((max - min)/ln((max+1)/(min+1)))^2) | |
Args: | |
min: minimum risk-level | |
max: maximum risk-level | |
scale: scale to apply to the sampling before applying exponential, | |
the output is rescaled back to fit in bounds [min, max] (the higher the scale the less uniform the distribution) | |
""" | |
def __init__(self, min: float, max: float, scale: float) -> None: | |
assert min >= 0 | |
assert max > min | |
assert scale > 0 | |
self.min = min | |
self.max = max | |
self.scale = scale | |
def sample(self, batch_size: int, device: torch.device) -> Tensor: | |
scale = self.scale / (self.max - self.min) | |
max = self.max * scale | |
min = self.min * scale | |
return ( | |
max | |
- ( | |
( | |
torch.rand(batch_size, device=device) | |
* (math.log(max + 1) - math.log(min + 1)) | |
+ math.log(min + 1) | |
).exp() | |
- 1 | |
) | |
+ min | |
) / scale | |
def get_highest_risk(self, batch_size: int, device: torch.device) -> Tensor: | |
return torch.ones(batch_size, device=device) * self.max | |
def get_risk_level_sampler(distribution_params: dict) -> AbstractRiskLevelSampler: | |
"""Function that returns the risk level sampler that matches the given parameters. | |
Tries to give a comprehensive feedback if the parameters are not recognized and raise an error. | |
Args: | |
Risk distribution should be one of the following types (with different parameter values as desired) : | |
{"type": "uniform", "min": 0, "max": 1}, | |
{"type": "normal", "mean": 0, "sigma": 1}, | |
{"type": "bernoulli", "p": 0.5, "min": 0, "max": 1}, | |
{"type": "beta", "alpha": 2, "beta": 5, "min": 0, "max": 1}, | |
{"type": "chi2", "k": 3, "min": 0, "scale": 1}, | |
{"type": "log-normal", "mu": 0, "sigma": 1, "min": 0, "scale": 1} | |
{"type": "log-uniform", "min": 0, "max": 1, "scale": 1} | |
Raises: | |
RuntimeError: If the given parameter dictionary does not match one of the expected formats, raise a comprehensive error. | |
Returns: | |
A risk level sampler matching the given parameters. | |
""" | |
known_types = [ | |
"uniform", | |
"normal", | |
"bernoulli", | |
"beta", | |
"chi2", | |
"log-normal", | |
"log-uniform", | |
] | |
try: | |
if distribution_params["type"].lower() == "uniform": | |
expected_params = inspect.getfullargspec(UniformRiskLevelSampler)[0][1:] | |
return UniformRiskLevelSampler( | |
distribution_params["min"], distribution_params["max"] | |
) | |
elif distribution_params["type"].lower() == "normal": | |
expected_params = inspect.getfullargspec(NormalRiskLevelSampler)[0][1:] | |
return NormalRiskLevelSampler( | |
distribution_params["mean"], distribution_params["sigma"] | |
) | |
elif distribution_params["type"].lower() == "bernoulli": | |
expected_params = inspect.getfullargspec(BernoulliRiskLevelSampler)[0][1:] | |
return BernoulliRiskLevelSampler( | |
distribution_params["min"], | |
distribution_params["max"], | |
distribution_params["p"], | |
) | |
elif distribution_params["type"].lower() == "beta": | |
expected_params = inspect.getfullargspec(BetaRiskLevelSampler)[0][1:] | |
return BetaRiskLevelSampler( | |
distribution_params["min"], | |
distribution_params["max"], | |
distribution_params["alpha"], | |
distribution_params["beta"], | |
) | |
elif distribution_params["type"].lower() == "chi2": | |
expected_params = inspect.getfullargspec(Chi2RiskLevelSampler)[0][1:] | |
return Chi2RiskLevelSampler( | |
distribution_params["min"], | |
distribution_params["scale"], | |
distribution_params["k"], | |
) | |
elif distribution_params["type"].lower() == "log-normal": | |
expected_params = inspect.getfullargspec(LogNormalRiskLevelSampler)[0][1:] | |
return LogNormalRiskLevelSampler( | |
distribution_params["min"], | |
distribution_params["scale"], | |
distribution_params["mu"], | |
distribution_params["sigma"], | |
) | |
elif distribution_params["type"].lower() == "log-uniform": | |
expected_params = inspect.getfullargspec(LogUniformRiskLevelSampler)[0][1:] | |
return LogUniformRiskLevelSampler( | |
distribution_params["min"], | |
distribution_params["max"], | |
distribution_params["scale"], | |
) | |
else: | |
raise RuntimeError( | |
f"Distribution {distribution_params['type']} is unknown. It should be one of {known_types}." | |
) | |
except KeyError: | |
if "type" in distribution_params: | |
raise RuntimeError( | |
f"The distribution '{distribution_params['type']}' is known but the given parameters {distribution_params} do not match the expected parameters {expected_params}." | |
) | |
else: | |
raise RuntimeError( | |
f"The given distribution parameters {distribution_params} do not define the distribution type in the field 'type'. Please add a field 'type' and set it to one of the handeled types: {known_types}." | |
) | |
if __name__ == "__main__": | |
import matplotlib.pyplot as plt | |
sampler = get_risk_level_sampler( | |
{"type": "log-uniform", "min": 0, "max": 1, "scale": 10} | |
) | |
# sampler = get_risk_level_sampler({"type": "normal", "mean": 0, "sigma": 1}) | |
a = sampler.sample(10000, "cpu").detach().numpy() | |
_ = plt.hist(a, bins="auto") # arguments are passed to np.histogram | |
plt.title("Histogram with 'auto' bins") | |
plt.show() | |