jmercat's picture
Removed history to avoid any unverified information being released
5769ee4
raw
history blame
13.3 kB
import math
import pytest
import torch
from risk_biased.utils.risk import (
CVaREstimator,
EntropicRiskEstimator,
get_risk_estimator,
get_risk_level_sampler,
)
torch.manual_seed(0)
@pytest.mark.parametrize(
"estimator_params, batch_size, num_samples",
[
({"type": "entropic", "eps": 1e-3}, 3, 10),
({"type": "entropic", "eps": 1e-6}, 5, 20),
],
)
def test_entropic_risk_estimator(
estimator_params: dict, batch_size: int, num_samples: int
):
num_agents = 1
estimator = get_risk_estimator(estimator_params)
assert type(estimator) == EntropicRiskEstimator
cost = torch.rand(batch_size, num_agents, num_samples)
risk_level_random = torch.rand(batch_size, num_agents)
weight = torch.ones(batch_size, num_agents, num_samples) / num_samples
objective_random = estimator(risk_level_random, cost, weight)
assert objective_random.shape == torch.Size([batch_size, num_agents])
risk_level_zero = torch.zeros(batch_size, num_agents)
objective_zero = estimator(risk_level_zero, cost, weight)
# entropic risk should fall back to mean if risk_level is zero
assert torch.allclose(objective_zero, (cost * weight).sum(dim=2))
cost_same = torch.ones(batch_size, num_agents, num_samples)
objective_same = estimator(risk_level_random, cost_same, weight)
# entropic risk should return mean if cost samples are all the same
assert torch.allclose(objective_same, (cost_same * weight).sum(dim=2))
risk_level_one = torch.ones(batch_size, num_agents)
objective_one = estimator(risk_level_one, cost, weight)
risk_level_nine = 10.0 * torch.ones(batch_size, num_agents)
objective_ten = estimator(risk_level_nine, cost, weight)
# entropic risk should be monotone increasing as a function of risk_level
assert all(objective_ten > objective_one)
@pytest.mark.parametrize(
"estimator_params, batch_size, num_samples, num_agents",
[
({"type": "cvar", "eps": 1e-3}, 3, 10, 1),
({"type": "cvar", "eps": 1e-6}, 5, 20, 3),
],
)
def test_cvar_estimator(
estimator_params: dict, batch_size: int, num_samples: int, num_agents: int
):
estimator = get_risk_estimator(estimator_params)
assert type(estimator) == CVaREstimator
cost = torch.rand(batch_size, num_agents, num_samples)
risk_level_random = torch.rand(batch_size, num_agents)
weights = torch.ones(batch_size, num_agents, num_samples) / num_samples
objective_random = estimator(risk_level_random, cost, weights)
assert objective_random.shape == torch.Size([batch_size, num_agents])
risk_level_zero = torch.zeros(batch_size, num_agents)
objective_zero = estimator(risk_level_zero, cost, weights)
# cvar should fall back to mean if risk_level is zero
assert torch.allclose(objective_zero, cost.mean(dim=2), rtol=1e-3, atol=1e-3)
cost_same = torch.ones(batch_size, num_agents, num_samples)
objective_same = estimator(risk_level_random, cost_same, weights)
# cvar should return mean if cost samples are all the same
assert torch.allclose(objective_same, cost_same.mean(dim=2))
risk_level_close_to_one = torch.ones(batch_size, num_agents) - 1e-2
objective_close_to_one = estimator(risk_level_close_to_one, cost, weights)
risk_level_one = torch.ones(batch_size, num_agents)
objective_one = estimator(risk_level_one, cost, weights)
# cvar should fall back to max if risk_level is close to one
assert torch.allclose(objective_close_to_one, cost.max(dim=2).values)
assert torch.allclose(objective_one, cost.max(dim=2).values)
risk_level_quarter = 0.25 * torch.ones(batch_size, num_agents)
objective_quarter = estimator(risk_level_quarter, cost, weights)
risk_level_half = 0.5 * torch.ones(batch_size, num_agents)
objective_half = estimator(risk_level_half, cost, weights)
# cvar should be monotone increasing as a function of risk_level
assert (objective_half > objective_quarter).all()
def test_risk_estimator_raise():
with pytest.raises(RuntimeError):
get_risk_estimator({})
with pytest.raises(RuntimeError):
get_risk_estimator({"type": "entropic"})
with pytest.raises(RuntimeError):
get_risk_estimator({"eps": 1e-3})
@pytest.mark.parametrize(
"distribution_params, num_samples, device",
[
({"type": "uniform", "min": 0, "max": 1}, 1000, "cpu"),
({"type": "uniform", "min": 0, "max": 1}, 10000, "cuda"),
({"type": "uniform", "min": 10, "max": 100}, 10000, "cpu"),
],
)
def test_uniform_sampler(distribution_params: dict, num_samples: int, device: str):
tol_mean = 3 / math.sqrt(num_samples)
tol_std = 3 / math.pow(num_samples, 2 / 5)
expected_mean = (distribution_params["max"] + distribution_params["min"]) / 2
expected_std = (
distribution_params["max"] - distribution_params["min"]
) / math.sqrt(12)
sampler = get_risk_level_sampler(distribution_params=distribution_params)
sample = sampler.sample(num_samples, device)
std, mean = torch.std_mean(sample)
assert sample.shape == torch.Size([num_samples])
assert torch.abs(mean - expected_mean) / expected_std < tol_mean
assert torch.abs(std - expected_std) / expected_std < tol_std
@pytest.mark.parametrize(
"distribution_params, num_samples, device",
[
({"type": "normal", "mean": 0, "sigma": 1}, 1000, "cpu"),
({"type": "normal", "mean": 0, "sigma": 3}, 10000, "cuda"),
({"type": "normal", "mean": 3, "sigma": 10}, 10000, "cpu"),
({"type": "normal", "mean": 1, "sigma": 3}, 100000, "cpu"),
],
)
def test_normal_sampler(distribution_params: dict, num_samples: int, device: str):
tol_mean = 3 / math.sqrt(num_samples)
tol_std = 3 / math.pow(num_samples, 2 / 5)
expected_mean = distribution_params["mean"]
expected_std = distribution_params["sigma"]
sampler = get_risk_level_sampler(distribution_params=distribution_params)
sample = sampler.sample(num_samples, device)
std, mean = torch.std_mean(sample)
assert sample.shape == torch.Size([num_samples])
assert torch.abs(mean - expected_mean) / expected_std < tol_mean
assert torch.abs(std - expected_std) / expected_std < tol_std
@pytest.mark.parametrize(
"distribution_params, num_samples, device",
[
({"type": "bernoulli", "min": 0, "max": 1, "p": 0.5}, 1000, "cpu"),
({"type": "bernoulli", "min": 0, "max": 3, "p": 0.1}, 10000, "cuda"),
({"type": "bernoulli", "min": 3, "max": 10, "p": 0.9}, 10000, "cpu"),
({"type": "bernoulli", "min": 1, "max": 3, "p": 0.5}, 100000, "cpu"),
],
)
def test_bernoulli_sampler(distribution_params: dict, num_samples: int, device: str):
range = distribution_params["max"] - distribution_params["min"]
tol_mean = 3 / math.sqrt(num_samples)
tol_std = 3 / math.pow(num_samples, 2 / 5)
expected_mean = distribution_params["p"] * range + distribution_params["min"]
expected_std = (
math.sqrt(distribution_params["p"] * (1 - distribution_params["p"])) * range
)
sampler = get_risk_level_sampler(distribution_params=distribution_params)
sample = sampler.sample(num_samples, device)
std, mean = torch.std_mean(sample)
assert sample.shape == torch.Size([num_samples])
assert torch.abs(mean - expected_mean) / expected_std < tol_mean
assert torch.abs(std - expected_std) / expected_std < tol_std
@pytest.mark.parametrize(
"distribution_params, num_samples, device",
[
({"type": "beta", "min": 0, "max": 1, "alpha": 0.5, "beta": 0.5}, 1000, "cpu"),
({"type": "beta", "min": 0, "max": 3, "alpha": 5, "beta": 1}, 10000, "cuda"),
({"type": "beta", "min": 3, "max": 10, "alpha": 1, "beta": 3}, 10000, "cpu"),
({"type": "beta", "min": 1, "max": 3, "alpha": 2, "beta": 5}, 100000, "cpu"),
],
)
def test_beta_sampler(distribution_params: dict, num_samples: int, device: str):
range = distribution_params["max"] - distribution_params["min"]
tol_mean = 3 / math.sqrt(num_samples)
tol_std = 3 / math.pow(num_samples, 2 / 5)
aphaplusbeta = distribution_params["alpha"] + distribution_params["beta"]
expected_mean = (
distribution_params["alpha"] / aphaplusbeta * range + distribution_params["min"]
)
expected_std = (
math.sqrt(distribution_params["alpha"] * distribution_params["beta"])
/ (aphaplusbeta * math.sqrt(aphaplusbeta + 1))
* range
)
sampler = get_risk_level_sampler(distribution_params=distribution_params)
sample = sampler.sample(num_samples, device)
std, mean = torch.std_mean(sample)
assert sample.shape == torch.Size([num_samples])
assert torch.abs(mean - expected_mean) / expected_std < tol_mean
assert torch.abs(std - expected_std) / expected_std < tol_std
@pytest.mark.parametrize(
"distribution_params, num_samples, device",
[
({"type": "chi2", "min": 0, "scale": 1, "k": 1}, 1000, "cpu"),
({"type": "chi2", "min": 0, "scale": 3, "k": 2}, 10000, "cuda"),
({"type": "chi2", "min": 3, "scale": 10, "k": 3}, 10000, "cpu"),
({"type": "chi2", "min": 1, "scale": 3, "k": 10}, 100000, "cpu"),
],
)
def test_chi2_sampler(distribution_params: dict, num_samples: int, device: str):
tol_mean = (
3
* distribution_params["scale"]
* math.sqrt(2 * distribution_params["k"] / num_samples)
)
tol_std = 3 / math.pow(num_samples, 2 / 5)
expected_mean = (
distribution_params["k"] * distribution_params["scale"]
+ distribution_params["min"]
)
expected_std = (
math.sqrt(2 * distribution_params["k"]) * distribution_params["scale"]
)
sampler = get_risk_level_sampler(distribution_params=distribution_params)
sample = sampler.sample(num_samples, device)
std, mean = torch.std_mean(sample)
assert sample.shape == torch.Size([num_samples])
assert torch.abs(mean - expected_mean) < tol_mean
assert torch.abs(std - expected_std) / expected_std < tol_std
@pytest.mark.parametrize(
"distribution_params, num_samples, device",
[
(
{"type": "log-normal", "min": 0, "scale": 1, "mu": 0, "sigma": 0.5},
100,
"cpu",
),
(
{"type": "log-normal", "min": 0, "scale": 3, "mu": 0.3, "sigma": 1},
100000,
"cuda",
),
(
{"type": "log-normal", "min": 3, "scale": 10, "mu": 1, "sigma": 0.25},
10000,
"cpu",
),
(
{"type": "log-normal", "min": 1, "scale": 3, "mu": 0, "sigma": 1.5},
100000,
"cpu",
),
],
)
def test_lognormal_sampler(distribution_params: dict, num_samples: int, device: str):
tol_mean = 3 / math.sqrt(num_samples)
tol_std = 3 / math.pow(num_samples, 1 / 5)
expected_mean = (
math.exp(distribution_params["mu"] + distribution_params["sigma"] ** 2 / 2)
* distribution_params["scale"]
+ distribution_params["min"]
)
expected_std = (
math.sqrt(math.exp(distribution_params["sigma"] ** 2) - 1)
* math.exp(distribution_params["mu"] + (distribution_params["sigma"] ** 2) / 2)
) * distribution_params["scale"]
sampler = get_risk_level_sampler(distribution_params=distribution_params)
sample = sampler.sample(num_samples, device)
std, mean = torch.std_mean(sample)
assert sample.shape == torch.Size([num_samples])
assert torch.abs(mean - expected_mean) / expected_std < tol_mean
assert torch.abs(std - expected_std) / expected_std < tol_std
@pytest.mark.parametrize(
"distribution_params, num_samples, device",
[
(
{"type": "log-uniform", "min": 0, "max": 1, "scale": 1},
100,
"cpu",
),
(
{"type": "log-uniform", "min": 1, "max": 3, "scale": 3},
100000,
"cuda",
),
],
)
def test_loguniform_sampler(distribution_params: dict, num_samples: int, device: str):
tol_mean = 3 / math.sqrt(num_samples)
tol_std = 3 / math.pow(num_samples, 1 / 5)
max = distribution_params["max"]
min = distribution_params["min"]
scale = distribution_params["scale"] / (max - min)
expected_mean = (
max
- ((max - min) / math.log((scale * max + 1) / (scale * min + 1)) - 1 / scale)
+ min
)
expected_std = math.sqrt(
((scale * max + 1) ** 2 - (scale * min + 1) ** 2)
/ (2 * scale**2 * math.log((scale * max + 1) / (scale * min + 1)))
- ((max - min) / math.log((scale * max + 1) / (scale * min + 1))) ** 2
)
sampler = get_risk_level_sampler(distribution_params=distribution_params)
sample = sampler.sample(num_samples, device)
std, mean = torch.std_mean(sample)
assert sample.shape == torch.Size([num_samples])
assert torch.abs(mean - expected_mean) / expected_std < tol_mean
assert torch.abs(std - expected_std) / expected_std < tol_std
def test_risk_level_sampler_raise():
with pytest.raises(RuntimeError):
get_risk_level_sampler({})
with pytest.raises(RuntimeError):
get_risk_level_sampler({"type": "chi2"})
with pytest.raises(RuntimeError):
get_risk_level_sampler({"min": 0, "max": 1})