File size: 3,547 Bytes
5085882 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
import numpy as np
import torch as th
import torch.nn as nn
from torchdiffeq import odeint
from functools import partial
from tqdm import tqdm
class sde:
"""SDE solver class"""
def __init__(
self,
drift,
diffusion,
*,
t0,
t1,
num_steps,
sampler_type,
):
assert t0 < t1, "SDE sampler has to be in forward time"
self.num_timesteps = num_steps
self.t = th.linspace(t0, t1, num_steps)
self.dt = self.t[1] - self.t[0]
self.drift = drift
self.diffusion = diffusion
self.sampler_type = sampler_type
def __Euler_Maruyama_step(self, x, mean_x, t, model, **model_kwargs):
w_cur = th.randn(x.size()).to(x)
t = th.ones(x.size(0)).to(x) * t
dw = w_cur * th.sqrt(self.dt)
drift = self.drift(x, t, model, **model_kwargs)
diffusion = self.diffusion(x, t)
mean_x = x + drift * self.dt
x = mean_x + th.sqrt(2 * diffusion) * dw
return x, mean_x
def __Heun_step(self, x, _, t, model, **model_kwargs):
w_cur = th.randn(x.size()).to(x)
dw = w_cur * th.sqrt(self.dt)
t_cur = th.ones(x.size(0)).to(x) * t
diffusion = self.diffusion(x, t_cur)
xhat = x + th.sqrt(2 * diffusion) * dw
K1 = self.drift(xhat, t_cur, model, **model_kwargs)
xp = xhat + self.dt * K1
K2 = self.drift(xp, t_cur + self.dt, model, **model_kwargs)
return xhat + 0.5 * self.dt * (K1 + K2), xhat # at last time point we do not perform the heun step
def __forward_fn(self):
"""TODO: generalize here by adding all private functions ending with steps to it"""
sampler_dict = {
"Euler": self.__Euler_Maruyama_step,
"Heun": self.__Heun_step,
}
try:
sampler = sampler_dict[self.sampler_type]
except:
raise NotImplementedError("Smapler type not implemented.")
return sampler
def sample(self, init, model, **model_kwargs):
"""forward loop of sde"""
x = init
mean_x = init
samples = []
sampler = self.__forward_fn()
for ti in self.t[:-1]:
with th.no_grad():
x, mean_x = sampler(x, mean_x, ti, model, **model_kwargs)
samples.append(x)
return samples
class ode:
"""ODE solver class"""
def __init__(
self,
drift,
*,
t0,
t1,
sampler_type,
num_steps,
atol,
rtol,
):
assert t0 < t1, "ODE sampler has to be in forward time"
self.drift = drift
self.t = th.linspace(t0, t1, num_steps)
self.atol = atol
self.rtol = rtol
self.sampler_type = sampler_type
def sample(self, x, model, **model_kwargs):
device = x[0].device if isinstance(x, tuple) else x.device
def _fn(t, x):
t = th.ones(x[0].size(0)).to(device) * t if isinstance(x, tuple) else th.ones(x.size(0)).to(device) * t
model_output = self.drift(x, t, model, **model_kwargs)
return model_output
t = self.t.to(device)
atol = [self.atol] * len(x) if isinstance(x, tuple) else [self.atol]
rtol = [self.rtol] * len(x) if isinstance(x, tuple) else [self.rtol]
samples = odeint(
_fn,
x,
t,
method=self.sampler_type,
atol=atol,
rtol=rtol
)
return samples |