import inspect import math import warnings from abc import ABC, abstractmethod import torch from torch import Tensor class AbstractMonteCarloRiskEstimator(ABC): """Abstract class for Monte Carlo estimation of risk objectives""" @abstractmethod def __call__(self, risk_level: Tensor, cost: Tensor) -> Tensor: """Computes and returns the risk objective estimated on the cost tensor Args: risk_level: (batch_size,) tensor of risk-level at which the risk objective is computed cost: (batch_size, n_samples) tensor of cost samples Returns: risk tensor of size (batch_size,) """ class EntropicRiskEstimator(AbstractMonteCarloRiskEstimator): """Monte Carlo estimator for the entropic risk objective. This estimator computes the entropic risk as 1/risk_level * log( mean( exp(risk_level * cost), 1)) However, this is unstable. When risk_level is large, the logsumexp trick is used. When risk_level is small, it computes entropic_risk for small values of risk_level as the second order Taylor expansion instead. Args: eps: Risk-level threshold to switch between logsumexp and Taylor expansion. Defaults to 1e-4. """ def __init__(self, eps: float = 1e-4) -> None: self.eps = eps def __call__(self, risk_level: Tensor, cost: Tensor, weights: Tensor) -> Tensor: """Computes and returns the entropic risk estimated on the cost tensor Args: risk_level: (batch_size, n_agents,) tensor of risk-level at which the risk objective is computed cost: (batch_size, n_agents, n_samples) cost tensor weights: (batch_size, n_agents, n_samples) tensor of weights for the cost samples Returns: entropic risk tensor of size (batch_size,) """ weights = weights / weights.sum(dim=-1, keepdim=True) batch_size, n_agents, n_samples = cost.shape entropic_risk_cost_large_sigma = ( ((risk_level.view(batch_size, n_agents, 1) * cost).exp() * weights) .sum(-1) .log() ) / risk_level mean = (cost * weights).sum(dim=-1) var = (cost**2 * weights).sum(dim=-1) - mean**2 var, mean = torch.var_mean(cost, -1) entropic_risk_cost_small_sigma = mean + 0.5 * risk_level * var return torch.where( torch.abs(risk_level) > self.eps, entropic_risk_cost_large_sigma, entropic_risk_cost_small_sigma, ) class CVaREstimator(AbstractMonteCarloRiskEstimator): """Monte Carlo estimator for the conditional value-at-risk objective. This estimator is proposed in the following references, and shown to be consistent. - Hong et al. (2014), "Monte Carlo Methods for Value-at-Risk and Conditional Value-at-Risk: A Review" - Traindade et al. (2007), "Financial prediction with constrained tail risk" When risk_level is larger than 1 - eps, it falls back to the max operator Args: Args: eps: Risk-level threshold to switch between CVaR and Max. Defaults to 1e-4. """ def __init__(self, eps: float = 1e-4) -> None: self.eps = eps def __call__(self, risk_level: Tensor, cost: Tensor, weights: Tensor) -> Tensor: """Computes and returns the conditional value-at-risk estimated on the cost tensor Args: risk_level: (batch_size, n_agents) tensor of risk-level in [0, 1] at which the CVaR risk is computed cost: (batch_size, n_agents, n_samples) cost tensor weights: (batch_size, n_agents, n_samples) tensor of weights for the cost samples Returns: conditional value-at-risk tensor of size (batch_size, n_agents) """ assert risk_level.shape[0] == cost.shape[0] assert risk_level.shape[1] == cost.shape[1] if weights is None: weights = torch.ones_like(cost) / cost.shape[-1] else: weights = weights / weights.sum(dim=-1, keepdim=True) if not (torch.all(0.0 <= risk_level) and torch.all(risk_level <= 1.0)): warnings.warn( "risk_level is defined only between 0.0 and 1.0 for CVaR. Exceeded values will be clamped." ) risk_level = torch.clamp(risk_level, min=0.0, max=1.0) cvar_risk_high = cost.max(dim=-1).values sorted_indices = torch.argsort(cost, dim=-1) # cost_sorted = cost.sort(dim=-1, descending=False).values cost_sorted = torch.gather(cost, -1, sorted_indices) weights_sorted = torch.gather(weights, -1, sorted_indices) idx_to_choose = torch.argmax( (weights_sorted.cumsum(dim=-1) >= risk_level.unsqueeze(-1)).float(), -1 ) value_at_risk_mc = cost_sorted.gather(-1, idx_to_choose.unsqueeze(-1)).squeeze( -1 ) # weights_at_risk_mc = 1 - weights_sorted.cumsum(-1).gather( # -1, idx_to_choose.unsqueeze(-1) # ).squeeze(-1) # cvar_risk_mc = value_at_risk_mc + ( # (torch.relu(cost - value_at_risk_mc.unsqueeze(-1)) * weights).sum(dim=-1) # / weights_at_risk_mc # ) # cvar = torch.where(weights_at_risk_mc < self.eps, cvar_risk_high, cvar_risk_mc) cvar_risk_mc = value_at_risk_mc + 1 / (1 - risk_level) * ( (torch.relu(cost - value_at_risk_mc.unsqueeze(-1)) * weights).sum(dim=-1) ) cvar = torch.where(risk_level > 1 - self.eps, cvar_risk_high, cvar_risk_mc) return cvar def get_risk_estimator(estimator_params: dict) -> AbstractMonteCarloRiskEstimator: """Function that returns the Monte Carlo risk estimator hat matches the given parameters. Tries to give a comprehensive feedback if the parameters are not recognized and raise an error. Args: Risk estimator should be one of the following types (with different parameter values as desired) : {"type": "entropic", "eps": 1e-4}, {"type": "cvar", "eps": 1e-4} Raises: RuntimeError: If the given parameter dictionary does not match one of the expected formats, raise a comprehensive error. Returns: A risk estimator matching the given parameters. """ known_types = ["entropic", "cvar"] try: if estimator_params["type"].lower() == "entropic": expected_params = inspect.getfullargspec(EntropicRiskEstimator)[0][1:] return EntropicRiskEstimator(estimator_params["eps"]) elif estimator_params["type"].lower() == "cvar": expected_params = inspect.getfullargspec(CVaREstimator)[0][1:] return CVaREstimator(estimator_params["eps"]) else: raise RuntimeError( f"Risk estimator '{estimator_params['type']}' is unknown. It should be one of {known_types}." ) except KeyError: if "type" in estimator_params: raise RuntimeError( f"""The estimator '{estimator_params['type']}' is known but the given parameters {estimator_params} do not match the expected parameters {expected_params}.""" ) else: raise RuntimeError( f"""The given estimator parameters {estimator_params} do not define the estimator type in the field 'type'. Please add a field 'type' and set it to one of the handeled types: {known_types}.""" ) class AbstractRiskLevelSampler(ABC): """Abstract class for a risk-level sampler for training and evaluating risk-biased predictors""" @abstractmethod def sample(self, batch_size: int, device: torch.device) -> Tensor: """Returns a tensor of size batch_size with sampled risk-level values Args: batch_size: number of elements in the out tensor device: device of the output tensor Returns: A tensor of shape(batch_size,) filled with sampled risk values """ @abstractmethod def get_highest_risk(self, batch_size: int, device: torch.device) -> Tensor: """Returns a tensor of size batch_size with high values of risk. Args: batch_size: number of elements in the out tensor device: device of the output tensor Returns: A tensor of shape (batchc_size,) filled with the highest possible risk-level """ class UniformRiskLevelSampler(AbstractRiskLevelSampler): """Risk-level sampler with a uniform distribution Args: min: minimum risk-level max: maximum risk-level """ def __init__(self, min: int, max: int) -> None: self.min = min self.max = max def sample(self, batch_size: int, device: torch.device) -> Tensor: return torch.rand(batch_size, device=device) * (self.max - self.min) + self.min def get_highest_risk(self, batch_size: int, device: torch.device) -> Tensor: return torch.ones(batch_size, device=device) * self.max class NormalRiskLevelSampler(AbstractRiskLevelSampler): """Risk-level sampler with a normal distribution Args: mean: average risk-level sigma: standard deviation of the sampler """ def __init__(self, mean: int, sigma: int) -> None: self.mean = mean self.sigma = sigma def sample(self, batch_size: int, device: torch.device) -> Tensor: return torch.randn(batch_size, device=device) * self.sigma + self.mean def get_highest_risk(self, batch_size: int, device: torch.device) -> Tensor: return torch.ones(batch_size, device=device) * self.sigma * 3 class BernoulliRiskLevelSampler(AbstractRiskLevelSampler): """Risk-level sampler with a scaled Bernoulli distribution Args: min: minimum risk-level max: maximum risk-level p: Bernoulli parameter """ def __init__(self, min: int, max: int, p: int) -> None: self.min = min self.max = max self.p = p def sample(self, batch_size: int, device: torch.device) -> Tensor: return ( torch.bernoulli(torch.ones(batch_size, device=device) * self.p) * (self.max - self.min) + self.min ) def get_highest_risk(self, batch_size: int, device: torch.device) -> Tensor: return torch.ones(batch_size, device=device) * self.max class BetaRiskLevelSampler(AbstractRiskLevelSampler): """Risk-level sampler with a scaled Beta distribution Distribution properties: mean = alpha*(max-min)/(alpha + beta) + min mode = (alpha-1)*(max-min)/(alpha + beta - 2) + min variance = alpha*beta*(max-min)**2/((alpha+beta)**2*(alpha+beta+1)) Args: min: minimum risk-level max: maximum risk-level alpha: First distribution parameter beta: Second distribution parameter """ def __init__(self, min: int, max: int, alpha: float, beta: float) -> None: self.min = min self.max = max self._distribution = torch.distributions.Beta( torch.tensor([alpha], dtype=torch.float32), torch.tensor([beta], dtype=torch.float32), ) @property def alpha(self): return self._distribution.concentration1.item() @alpha.setter def alpha(self, alpha: float): self._distribution = torch.distributions.Beta( torch.tensor([alpha], dtype=torch.float32), torch.tensor([self.beta], dtype=torch.float32), ) @property def beta(self): return self._distribution.concentration0.item() @beta.setter def beta(self, beta: float): self._distribution = torch.distributions.Beta( torch.tensor([self.alpha], dtype=torch.float32), torch.tensor([beta], dtype=torch.float32), ) def sample(self, batch_size: int, device: torch.device) -> Tensor: return ( self._distribution.sample((batch_size,)).to(device) * (self.max - self.min) + self.min ).view(batch_size) def get_highest_risk(self, batch_size: int, device: torch.device) -> Tensor: return torch.ones(batch_size, device=device) * self.max class Chi2RiskLevelSampler(AbstractRiskLevelSampler): """Risk-level sampler with a scaled chi2 distribution Distribution properties: mean = k*scale + min mode = max(k-2, 0)*scale + min variance = 2*k*scale**2 Args: min: minimum risk-level scale: scaling factor for the risk-level k: Chi2 parameter: degrees of freedom of the distribution """ def __init__(self, min: int, scale: float, k: int) -> None: self.min = min self.scale = scale self._distribution = torch.distributions.Chi2( torch.tensor([k], dtype=torch.float32) ) @property def k(self): return self._distribution.df.item() @k.setter def k(self, k: int): self._distribution = torch.distributions.Chi2( torch.tensor([k], dtype=torch.float32) ) def sample(self, batch_size: int, device: torch.device) -> Tensor: return ( self._distribution.sample((batch_size,)).to(device) * self.scale + self.min ).view(batch_size) def get_highest_risk(self, batch_size: int, device: torch.device) -> Tensor: std = self.scale * math.sqrt(2 * self.k) return torch.ones(batch_size, device=device) * std * 3 class LogNormalRiskLevelSampler(AbstractRiskLevelSampler): """Risk-level sampler with a scaled Beta distribution Distribution properties: mean = exp(mu + sigma**2/2)*scale + min mode = exp(mu - sigma**2)*scale + min variance = (exp(sigma**2)-1)*exp(2*mu+sigma**2)*scale**2 Args: min: minimum risk-level scale: scaling factor for the risk-level mu: First distribution parameter sigma: maximum risk-level """ def __init__(self, min: int, scale: float, mu: float, sigma: float) -> None: self.min = min self.scale = scale self._distribution = torch.distributions.LogNormal( torch.tensor([mu], dtype=torch.float32), torch.tensor([sigma], dtype=torch.float32), ) @property def mu(self): return self._distribution.loc.item() @mu.setter def mu(self, mu: float): self._distribution = torch.distributions.LogNormal( torch.tensor([mu], dtype=torch.float32), torch.tensor([self.sigma], dtype=torch.float32), ) @property def sigma(self) -> float: return self._distribution.scale.item() @sigma.setter def sigma(self, sigma: float): self._distribution = torch.distributions.LogNormal( torch.tensor([self.mu], dtype=torch.float32), torch.tensor([sigma], dtype=torch.float32), ) def sample(self, batch_size: int, device: torch.device) -> Tensor: return ( self._distribution.sample((batch_size,)).to(device) * self.scale + self.min ).view(batch_size) def get_highest_risk(self, batch_size: int, device: torch.device) -> Tensor: std = ( (torch.exp(self.sigma.square()) - 1).sqrt() * torch.exp(self.mu + self.sigma.square() / 2) * self.scale ) return torch.ones(batch_size, device=device) * 3 * std class LogUniformRiskLevelSampler(AbstractRiskLevelSampler): """Risk-level sampler with a reversed log-uniform distribution (increasing density function). Between min and max. Distribution properties: mean = (max - min)/ln((max+1)/(min+1)) - 1/scale mode = None variance = (((max+1)^2 - (min+1)^2)/(2*ln((max+1)/(min+1))) - ((max - min)/ln((max+1)/(min+1)))^2) Args: min: minimum risk-level max: maximum risk-level scale: scale to apply to the sampling before applying exponential, the output is rescaled back to fit in bounds [min, max] (the higher the scale the less uniform the distribution) """ def __init__(self, min: float, max: float, scale: float) -> None: assert min >= 0 assert max > min assert scale > 0 self.min = min self.max = max self.scale = scale def sample(self, batch_size: int, device: torch.device) -> Tensor: scale = self.scale / (self.max - self.min) max = self.max * scale min = self.min * scale return ( max - ( ( torch.rand(batch_size, device=device) * (math.log(max + 1) - math.log(min + 1)) + math.log(min + 1) ).exp() - 1 ) + min ) / scale def get_highest_risk(self, batch_size: int, device: torch.device) -> Tensor: return torch.ones(batch_size, device=device) * self.max def get_risk_level_sampler(distribution_params: dict) -> AbstractRiskLevelSampler: """Function that returns the risk level sampler that matches the given parameters. Tries to give a comprehensive feedback if the parameters are not recognized and raise an error. Args: Risk distribution should be one of the following types (with different parameter values as desired) : {"type": "uniform", "min": 0, "max": 1}, {"type": "normal", "mean": 0, "sigma": 1}, {"type": "bernoulli", "p": 0.5, "min": 0, "max": 1}, {"type": "beta", "alpha": 2, "beta": 5, "min": 0, "max": 1}, {"type": "chi2", "k": 3, "min": 0, "scale": 1}, {"type": "log-normal", "mu": 0, "sigma": 1, "min": 0, "scale": 1} {"type": "log-uniform", "min": 0, "max": 1, "scale": 1} Raises: RuntimeError: If the given parameter dictionary does not match one of the expected formats, raise a comprehensive error. Returns: A risk level sampler matching the given parameters. """ known_types = [ "uniform", "normal", "bernoulli", "beta", "chi2", "log-normal", "log-uniform", ] try: if distribution_params["type"].lower() == "uniform": expected_params = inspect.getfullargspec(UniformRiskLevelSampler)[0][1:] return UniformRiskLevelSampler( distribution_params["min"], distribution_params["max"] ) elif distribution_params["type"].lower() == "normal": expected_params = inspect.getfullargspec(NormalRiskLevelSampler)[0][1:] return NormalRiskLevelSampler( distribution_params["mean"], distribution_params["sigma"] ) elif distribution_params["type"].lower() == "bernoulli": expected_params = inspect.getfullargspec(BernoulliRiskLevelSampler)[0][1:] return BernoulliRiskLevelSampler( distribution_params["min"], distribution_params["max"], distribution_params["p"], ) elif distribution_params["type"].lower() == "beta": expected_params = inspect.getfullargspec(BetaRiskLevelSampler)[0][1:] return BetaRiskLevelSampler( distribution_params["min"], distribution_params["max"], distribution_params["alpha"], distribution_params["beta"], ) elif distribution_params["type"].lower() == "chi2": expected_params = inspect.getfullargspec(Chi2RiskLevelSampler)[0][1:] return Chi2RiskLevelSampler( distribution_params["min"], distribution_params["scale"], distribution_params["k"], ) elif distribution_params["type"].lower() == "log-normal": expected_params = inspect.getfullargspec(LogNormalRiskLevelSampler)[0][1:] return LogNormalRiskLevelSampler( distribution_params["min"], distribution_params["scale"], distribution_params["mu"], distribution_params["sigma"], ) elif distribution_params["type"].lower() == "log-uniform": expected_params = inspect.getfullargspec(LogUniformRiskLevelSampler)[0][1:] return LogUniformRiskLevelSampler( distribution_params["min"], distribution_params["max"], distribution_params["scale"], ) else: raise RuntimeError( f"Distribution {distribution_params['type']} is unknown. It should be one of {known_types}." ) except KeyError: if "type" in distribution_params: raise RuntimeError( f"The distribution '{distribution_params['type']}' is known but the given parameters {distribution_params} do not match the expected parameters {expected_params}." ) else: raise RuntimeError( f"The given distribution parameters {distribution_params} do not define the distribution type in the field 'type'. Please add a field 'type' and set it to one of the handeled types: {known_types}." ) if __name__ == "__main__": import matplotlib.pyplot as plt sampler = get_risk_level_sampler( {"type": "log-uniform", "min": 0, "max": 1, "scale": 10} ) # sampler = get_risk_level_sampler({"type": "normal", "mean": 0, "sigma": 1}) a = sampler.sample(10000, "cpu").detach().numpy() _ = plt.hist(a, bins="auto") # arguments are passed to np.histogram plt.title("Histogram with 'auto' bins") plt.show()