Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import torch | |
import torch.nn as nn | |
import torch.distributions as D | |
from torch.nn import functional as F | |
import numpy as np | |
from torch.autograd import Variable | |
class BaseFlow(nn.Module): | |
def __init__(self): | |
super().__init__() | |
def sample(self, n=1, context=None, **kwargs): | |
dim = self.dim | |
if isinstance(self.dim, int): | |
dim = [dim, ] | |
spl = Variable(torch.FloatTensor(n, *dim).normal_()) | |
lgd = Variable(torch.from_numpy( | |
np.zeros(n).astype('float32'))) | |
if context is None: | |
context = Variable(torch.from_numpy( | |
np.ones((n, self.context_dim)).astype('float32'))) | |
if hasattr(self, 'gpu'): | |
if self.gpu: | |
spl = spl.cuda() | |
lgd = lgd.cuda() | |
context = context.gpu() | |
return self.forward((spl, lgd, context)) | |
def cuda(self): | |
self.gpu = True | |
return super(BaseFlow, self).cuda() | |
def varify(x): | |
return torch.autograd.Variable(torch.from_numpy(x)) | |
def oper(array,oper,axis=-1,keepdims=False): | |
a_oper = oper(array) | |
if keepdims: | |
shape = [] | |
for j,s in enumerate(array.size()): | |
shape.append(s) | |
shape[axis] = -1 | |
a_oper = a_oper.view(*shape) | |
return a_oper | |
def log_sum_exp(A, axis=-1, sum_op=torch.sum): | |
maximum = lambda x: x.max(axis)[0] | |
A_max = oper(A,maximum,axis,True) | |
summation = lambda x: sum_op(torch.exp(x-A_max), axis) | |
B = torch.log(oper(A,summation,axis,True)) + A_max | |
return B | |
delta = 1e-6 | |
logsigmoid = lambda x: -F.softplus(-x) | |
log = lambda x: torch.log(x*1e2)-np.log(1e2) | |
softplus_ = nn.Softplus() | |
softplus = lambda x: softplus_(x) + delta | |
def softmax(x, dim=-1): | |
e_x = torch.exp(x - x.max(dim=dim, keepdim=True)[0]) | |
out = e_x / e_x.sum(dim=dim, keepdim=True) | |
return out | |
class DenseSigmoidFlow(nn.Module): | |
def __init__(self, hidden_dim, in_dim=1, out_dim=1): | |
super().__init__() | |
self.in_dim = in_dim | |
self.hidden_dim = hidden_dim | |
self.out_dim = out_dim | |
self.act_a = lambda x: F.softplus(x) | |
self.act_b = lambda x: x | |
self.act_w = lambda x: torch.softmax(x, dim=3) | |
self.act_u = lambda x: torch.softmax(x, dim=3) | |
self.u_ = torch.nn.Parameter(torch.Tensor(hidden_dim, in_dim)) | |
self.w_ = torch.nn.Parameter(torch.Tensor(out_dim, hidden_dim)) | |
self.num_params = 3* hidden_dim + in_dim | |
self.reset_parameters() | |
def reset_parameters(self): | |
self.u_.data.uniform_(-0.001, 0.001) | |
self.w_.data.uniform_(-0.001, 0.001) | |
def forward(self, x, dsparams): | |
delta = 1e-7 | |
inv = np.log(np.exp(1 - delta) - 1) | |
ndim = self.hidden_dim | |
pre_u = self.u_[None, None, :, :] + dsparams[:, :, -self.in_dim:][:, :, None, :] | |
pre_w = self.w_[None, None, :, :] + dsparams[:, :, 2 * ndim:3 * ndim][:, :, None, :] | |
a = self.act_a(dsparams[:, :, 0 * ndim:1 * ndim] + inv) | |
b = self.act_b(dsparams[:, :, 1 * ndim:2 * ndim]) | |
w = self.act_w(pre_w) | |
u = self.act_u(pre_u) | |
pre_sigm = torch.sum(u * a[:, :, :, None] * x[:, :, None, :], 3) + b | |
sigm = torch.selu(pre_sigm) | |
x_pre = torch.sum(w * sigm[:, :, None, :], dim=3) | |
#x_ = torch.special.logit(x_pre, eps=1e-5) | |
#xnew = x_ | |
xnew = x_pre | |
return xnew | |
class DDSF(nn.Module): | |
def __init__(self, n_blocks=1, hidden_dim=16): | |
super().__init__() | |
self.num_params = 0 | |
if n_blocks == 1: | |
model = [DenseSigmoidFlow(hidden_dim, in_dim=1, out_dim=1)] | |
else: | |
model = [DenseSigmoidFlow(hidden_dim=hidden_dim, in_dim=1, out_dim=hidden_dim)] | |
for _ in range(n_blocks-2): | |
model += [DenseSigmoidFlow(hidden_dim=hidden_dim, in_dim=hidden_dim, out_dim=hidden_dim)] | |
model += [DenseSigmoidFlow(hidden_dim=hidden_dim, in_dim=hidden_dim, out_dim=1)] | |
self.model = nn.Sequential(*model) | |
for block in self.model: | |
self.num_params += block.num_params | |
def forward(self, x, dsparams): | |
x = x.unsqueeze(2) | |
start = 0 | |
for block in self.model: | |
block_dsparams = dsparams[:,:,start:start+block.num_params] | |
x = block(x, block_dsparams) | |
start += block.num_params | |
return x.squeeze(2) | |
def compute_jacobian(inputs, outputs): | |
batch_size = outputs.size(0) | |
outVector = torch.sum(outputs,0).view(-1) | |
outdim = outVector.size()[0] | |
jac = torch.stack([torch.autograd.grad(outVector[i], inputs, | |
retain_graph=True, create_graph=True)[0].view(batch_size, outdim) for i in range(outdim)], dim=1) | |
jacs = [jac[i,:,:] for i in range(batch_size)] | |
print(jacs[1]) | |
if __name__ == '__main__': | |
flow = DDSF(n_blocks=10, hidden_dim=50) | |
x = torch.arange(20).view(10, 2)/10.-1. | |
x = Variable(x, requires_grad=True) | |
dsparams = torch.randn(1, 2, 2*flow.num_params).repeat(10,1,1) | |
y = flow(x, dsparams) | |
print(x, y) | |
compute_jacobian(x, y) | |
""" | |
flow = ConvDenseSigmoidFlow(1,256,1) | |
dsparams = torch.randn(1, 2, 1000).repeat(10,1,1) | |
x = torch.arange(20).view(10,2,1).repeat(1,1,4).view(10,2,2,2)/10. | |
print(x.size(), dsparams.size()) | |
out = flow(x, dsparams) | |
print(x, out.flatten(2), out.size()) | |
flow = ConvDDSF(n_blocks=3) | |
dsparams = torch.randn(1, 2, flow.num_params).repeat(10,1,1) | |
x = torch.arange(80).view(10,2,4).view(10,2,2,2)/10 | |
print(x.size(), dsparams.size()) | |
out = flow(x, dsparams) | |
print(x, out.flatten(2), out.size()) | |
""" | |