ultpixgen / modules /stage_c.py
roubaofeipi's picture
Upload 100 files
5231633 verified
raw
history blame
13.2 kB
import torch
from torch import nn
import numpy as np
import math
from .common import AttnBlock, LayerNorm2d, ResBlock, FeedForwardBlock, TimestepBlock
#from .controlnet import ControlNetDeliverer
class UpDownBlock2d(nn.Module):
def __init__(self, c_in, c_out, mode, enabled=True):
super().__init__()
assert mode in ['up', 'down']
interpolation = nn.Upsample(scale_factor=2 if mode == 'up' else 0.5, mode='bilinear',
align_corners=True) if enabled else nn.Identity()
mapping = nn.Conv2d(c_in, c_out, kernel_size=1)
self.blocks = nn.ModuleList([interpolation, mapping] if mode == 'up' else [mapping, interpolation])
def forward(self, x):
for block in self.blocks:
x = block(x.float())
return x
class StageC(nn.Module):
def __init__(self, c_in=16, c_out=16, c_r=64, patch_size=1, c_cond=2048, c_hidden=[2048, 2048], nhead=[32, 32],
blocks=[[8, 24], [24, 8]], block_repeat=[[1, 1], [1, 1]], level_config=['CTA', 'CTA'],
c_clip_text=1280, c_clip_text_pooled=1280, c_clip_img=768, c_clip_seq=4, kernel_size=3,
dropout=[0.1, 0.1], self_attn=True, t_conds=['sca', 'crp'], switch_level=[False]):
super().__init__()
self.c_r = c_r
self.t_conds = t_conds
self.c_clip_seq = c_clip_seq
if not isinstance(dropout, list):
dropout = [dropout] * len(c_hidden)
if not isinstance(self_attn, list):
self_attn = [self_attn] * len(c_hidden)
# CONDITIONING
self.clip_txt_mapper = nn.Linear(c_clip_text, c_cond)
self.clip_txt_pooled_mapper = nn.Linear(c_clip_text_pooled, c_cond * c_clip_seq)
self.clip_img_mapper = nn.Linear(c_clip_img, c_cond * c_clip_seq)
self.clip_norm = nn.LayerNorm(c_cond, elementwise_affine=False, eps=1e-6)
self.embedding = nn.Sequential(
nn.PixelUnshuffle(patch_size),
nn.Conv2d(c_in * (patch_size ** 2), c_hidden[0], kernel_size=1),
LayerNorm2d(c_hidden[0], elementwise_affine=False, eps=1e-6)
)
def get_block(block_type, c_hidden, nhead, c_skip=0, dropout=0, self_attn=True):
if block_type == 'C':
return ResBlock(c_hidden, c_skip, kernel_size=kernel_size, dropout=dropout)
elif block_type == 'A':
return AttnBlock(c_hidden, c_cond, nhead, self_attn=self_attn, dropout=dropout)
elif block_type == 'F':
return FeedForwardBlock(c_hidden, dropout=dropout)
elif block_type == 'T':
return TimestepBlock(c_hidden, c_r, conds=t_conds)
else:
raise Exception(f'Block type {block_type} not supported')
# BLOCKS
# -- down blocks
self.down_blocks = nn.ModuleList()
self.down_downscalers = nn.ModuleList()
self.down_repeat_mappers = nn.ModuleList()
for i in range(len(c_hidden)):
if i > 0:
self.down_downscalers.append(nn.Sequential(
LayerNorm2d(c_hidden[i - 1], elementwise_affine=False, eps=1e-6),
UpDownBlock2d(c_hidden[i - 1], c_hidden[i], mode='down', enabled=switch_level[i - 1])
))
else:
self.down_downscalers.append(nn.Identity())
down_block = nn.ModuleList()
for _ in range(blocks[0][i]):
for block_type in level_config[i]:
block = get_block(block_type, c_hidden[i], nhead[i], dropout=dropout[i], self_attn=self_attn[i])
down_block.append(block)
self.down_blocks.append(down_block)
if block_repeat is not None:
block_repeat_mappers = nn.ModuleList()
for _ in range(block_repeat[0][i] - 1):
block_repeat_mappers.append(nn.Conv2d(c_hidden[i], c_hidden[i], kernel_size=1))
self.down_repeat_mappers.append(block_repeat_mappers)
# -- up blocks
self.up_blocks = nn.ModuleList()
self.up_upscalers = nn.ModuleList()
self.up_repeat_mappers = nn.ModuleList()
for i in reversed(range(len(c_hidden))):
if i > 0:
self.up_upscalers.append(nn.Sequential(
LayerNorm2d(c_hidden[i], elementwise_affine=False, eps=1e-6),
UpDownBlock2d(c_hidden[i], c_hidden[i - 1], mode='up', enabled=switch_level[i - 1])
))
else:
self.up_upscalers.append(nn.Identity())
up_block = nn.ModuleList()
for j in range(blocks[1][::-1][i]):
for k, block_type in enumerate(level_config[i]):
c_skip = c_hidden[i] if i < len(c_hidden) - 1 and j == k == 0 else 0
block = get_block(block_type, c_hidden[i], nhead[i], c_skip=c_skip, dropout=dropout[i],
self_attn=self_attn[i])
up_block.append(block)
self.up_blocks.append(up_block)
if block_repeat is not None:
block_repeat_mappers = nn.ModuleList()
for _ in range(block_repeat[1][::-1][i] - 1):
block_repeat_mappers.append(nn.Conv2d(c_hidden[i], c_hidden[i], kernel_size=1))
self.up_repeat_mappers.append(block_repeat_mappers)
# OUTPUT
self.clf = nn.Sequential(
LayerNorm2d(c_hidden[0], elementwise_affine=False, eps=1e-6),
nn.Conv2d(c_hidden[0], c_out * (patch_size ** 2), kernel_size=1),
nn.PixelShuffle(patch_size),
)
# --- WEIGHT INIT ---
self.apply(self._init_weights) # General init
nn.init.normal_(self.clip_txt_mapper.weight, std=0.02) # conditionings
nn.init.normal_(self.clip_txt_pooled_mapper.weight, std=0.02) # conditionings
nn.init.normal_(self.clip_img_mapper.weight, std=0.02) # conditionings
torch.nn.init.xavier_uniform_(self.embedding[1].weight, 0.02) # inputs
nn.init.constant_(self.clf[1].weight, 0) # outputs
# blocks
for level_block in self.down_blocks + self.up_blocks:
for block in level_block:
if isinstance(block, ResBlock) or isinstance(block, FeedForwardBlock):
block.channelwise[-1].weight.data *= np.sqrt(1 / sum(blocks[0]))
elif isinstance(block, TimestepBlock):
for layer in block.modules():
if isinstance(layer, nn.Linear):
nn.init.constant_(layer.weight, 0)
def _init_weights(self, m):
if isinstance(m, (nn.Conv2d, nn.Linear)):
torch.nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
def gen_r_embedding(self, r, max_positions=10000):
r = r * max_positions
half_dim = self.c_r // 2
emb = math.log(max_positions) / (half_dim - 1)
emb = torch.arange(half_dim, device=r.device).float().mul(-emb).exp()
emb = r[:, None] * emb[None, :]
emb = torch.cat([emb.sin(), emb.cos()], dim=1)
if self.c_r % 2 == 1: # zero pad
emb = nn.functional.pad(emb, (0, 1), mode='constant')
return emb
def gen_c_embeddings(self, clip_txt, clip_txt_pooled, clip_img):
clip_txt = self.clip_txt_mapper(clip_txt)
if len(clip_txt_pooled.shape) == 2:
clip_txt_pool = clip_txt_pooled.unsqueeze(1)
if len(clip_img.shape) == 2:
clip_img = clip_img.unsqueeze(1)
clip_txt_pool = self.clip_txt_pooled_mapper(clip_txt_pooled).view(clip_txt_pooled.size(0), clip_txt_pooled.size(1) * self.c_clip_seq, -1)
clip_img = self.clip_img_mapper(clip_img).view(clip_img.size(0), clip_img.size(1) * self.c_clip_seq, -1)
clip = torch.cat([clip_txt, clip_txt_pool, clip_img], dim=1)
clip = self.clip_norm(clip)
return clip
def _down_encode(self, x, r_embed, clip, cnet=None):
level_outputs = []
block_group = zip(self.down_blocks, self.down_downscalers, self.down_repeat_mappers)
for down_block, downscaler, repmap in block_group:
x = downscaler(x)
for i in range(len(repmap) + 1):
for block in down_block:
if isinstance(block, ResBlock) or (
hasattr(block, '_fsdp_wrapped_module') and isinstance(block._fsdp_wrapped_module,
ResBlock)):
if cnet is not None:
next_cnet = cnet()
if next_cnet is not None:
x = x + nn.functional.interpolate(next_cnet, size=x.shape[-2:], mode='bilinear',
align_corners=True)
x = block(x)
elif isinstance(block, AttnBlock) or (
hasattr(block, '_fsdp_wrapped_module') and isinstance(block._fsdp_wrapped_module,
AttnBlock)):
x = block(x, clip)
elif isinstance(block, TimestepBlock) or (
hasattr(block, '_fsdp_wrapped_module') and isinstance(block._fsdp_wrapped_module,
TimestepBlock)):
x = block(x, r_embed)
else:
x = block(x)
if i < len(repmap):
x = repmap[i](x)
level_outputs.insert(0, x)
return level_outputs
def _up_decode(self, level_outputs, r_embed, clip, cnet=None):
x = level_outputs[0]
block_group = zip(self.up_blocks, self.up_upscalers, self.up_repeat_mappers)
for i, (up_block, upscaler, repmap) in enumerate(block_group):
for j in range(len(repmap) + 1):
for k, block in enumerate(up_block):
if isinstance(block, ResBlock) or (
hasattr(block, '_fsdp_wrapped_module') and isinstance(block._fsdp_wrapped_module,
ResBlock)):
skip = level_outputs[i] if k == 0 and i > 0 else None
if skip is not None and (x.size(-1) != skip.size(-1) or x.size(-2) != skip.size(-2)):
x = torch.nn.functional.interpolate(x.float(), skip.shape[-2:], mode='bilinear',
align_corners=True)
if cnet is not None:
next_cnet = cnet()
if next_cnet is not None:
x = x + nn.functional.interpolate(next_cnet, size=x.shape[-2:], mode='bilinear',
align_corners=True)
x = block(x, skip)
elif isinstance(block, AttnBlock) or (
hasattr(block, '_fsdp_wrapped_module') and isinstance(block._fsdp_wrapped_module,
AttnBlock)):
x = block(x, clip)
elif isinstance(block, TimestepBlock) or (
hasattr(block, '_fsdp_wrapped_module') and isinstance(block._fsdp_wrapped_module,
TimestepBlock)):
x = block(x, r_embed)
else:
x = block(x)
if j < len(repmap):
x = repmap[j](x)
x = upscaler(x)
return x
def forward(self, x, r, clip_text, clip_text_pooled, clip_img, cnet=None, **kwargs):
# Process the conditioning embeddings
r_embed = self.gen_r_embedding(r)
for c in self.t_conds:
t_cond = kwargs.get(c, torch.zeros_like(r))
r_embed = torch.cat([r_embed, self.gen_r_embedding(t_cond)], dim=1)
clip = self.gen_c_embeddings(clip_text, clip_text_pooled, clip_img)
# Model Blocks
x = self.embedding(x)
if cnet is not None:
cnet = ControlNetDeliverer(cnet)
level_outputs = self._down_encode(x, r_embed, clip, cnet)
x = self._up_decode(level_outputs, r_embed, clip, cnet)
return self.clf(x)
def update_weights_ema(self, src_model, beta=0.999):
for self_params, src_params in zip(self.parameters(), src_model.parameters()):
self_params.data = self_params.data * beta + src_params.data.clone().to(self_params.device) * (1 - beta)
for self_buffers, src_buffers in zip(self.buffers(), src_model.buffers()):
self_buffers.data = self_buffers.data * beta + src_buffers.data.clone().to(self_buffers.device) * (1 - beta)