|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
import numpy as np |
|
from typing import Callable, Optional |
|
import warnings |
|
|
|
try: |
|
from apex.normalization import FusedRMSNorm as RMSNorm |
|
except ImportError: |
|
warnings.warn("Cannot import apex RMSNorm, switch to vanilla implementation") |
|
|
|
|
|
class RMSNorm(torch.nn.Module): |
|
def __init__(self, dim: int, eps: float = 1e-6): |
|
""" |
|
Initialize the RMSNorm normalization layer. |
|
Args: |
|
dim (int): The dimension of the input tensor. |
|
eps (float, optional): A small value added to the denominator for numerical stability. Default is 1e-6. |
|
Attributes: |
|
eps (float): A small value added to the denominator for numerical stability. |
|
weight (nn.Parameter): Learnable scaling parameter. |
|
""" |
|
super().__init__() |
|
self.eps = eps |
|
self.weight = nn.Parameter(torch.ones(dim)) |
|
|
|
def _norm(self, x): |
|
""" |
|
Apply the RMSNorm normalization to the input tensor. |
|
Args: |
|
x (torch.Tensor): The input tensor. |
|
Returns: |
|
torch.Tensor: The normalized tensor. |
|
""" |
|
return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps) |
|
|
|
def forward(self, x): |
|
""" |
|
Forward pass through the RMSNorm layer. |
|
Args: |
|
x (torch.Tensor): The input tensor. |
|
Returns: |
|
torch.Tensor: The output tensor after applying RMSNorm. |
|
""" |
|
output = self._norm(x.float()).type_as(x) |
|
return output * self.weight |
|
|
|
|
|
def modulate(x, scale): |
|
return x * (1 + scale.unsqueeze(1)) |
|
|
|
class LLamaFeedForward(nn.Module): |
|
""" |
|
Corresponds to the FeedForward layer in Next DiT. |
|
""" |
|
def __init__( |
|
self, |
|
dim: int, |
|
hidden_dim: int, |
|
multiple_of: int, |
|
ffn_dim_multiplier: Optional[float] = None, |
|
zeros_initialize: bool = True, |
|
dtype: torch.dtype = torch.float32, |
|
): |
|
super().__init__() |
|
self.dim = dim |
|
self.hidden_dim = hidden_dim |
|
self.multiple_of = multiple_of |
|
self.ffn_dim_multiplier = ffn_dim_multiplier |
|
self.zeros_initialize = zeros_initialize |
|
self.dtype = dtype |
|
|
|
|
|
hidden_dim_calculated = int(2 * self.hidden_dim / 3) |
|
if self.ffn_dim_multiplier is not None: |
|
hidden_dim_calculated = int(self.ffn_dim_multiplier * hidden_dim_calculated) |
|
hidden_dim_calculated = self.multiple_of * ((hidden_dim_calculated + self.multiple_of - 1) // self.multiple_of) |
|
|
|
|
|
self.w1 = nn.Linear(self.dim, hidden_dim_calculated, bias=False) |
|
self.w2 = nn.Linear(hidden_dim_calculated, self.dim, bias=False) |
|
self.w3 = nn.Linear(self.dim, hidden_dim_calculated, bias=False) |
|
|
|
|
|
if self.zeros_initialize: |
|
nn.init.zeros_(self.w2.weight) |
|
else: |
|
nn.init.xavier_uniform_(self.w2.weight) |
|
nn.init.xavier_uniform_(self.w1.weight) |
|
nn.init.xavier_uniform_(self.w3.weight) |
|
|
|
def _forward_silu_gating(self, x1, x3): |
|
return F.silu(x1) * x3 |
|
|
|
def forward(self, x): |
|
return self.w2(self._forward_silu_gating(self.w1(x), self.w3(x))) |
|
|
|
class FinalLayer(nn.Module): |
|
""" |
|
The final layer of Next-DiT. |
|
""" |
|
def __init__(self, hidden_size: int, patch_size: int, out_channels: int): |
|
super().__init__() |
|
self.hidden_size = hidden_size |
|
self.patch_size = patch_size |
|
self.out_channels = out_channels |
|
|
|
|
|
self.norm_final = nn.LayerNorm(self.hidden_size, eps=1e-6, elementwise_affine=False) |
|
self.linear = nn.Linear(self.hidden_size, np.prod(self.patch_size) * self.out_channels, bias=True) |
|
nn.init.zeros_(self.linear.weight) |
|
nn.init.zeros_(self.linear.bias) |
|
|
|
self.adaLN_modulation = nn.Sequential( |
|
nn.SiLU(), |
|
nn.Linear(self.hidden_size, self.hidden_size), |
|
) |
|
|
|
nn.init.zeros_(self.adaLN_modulation[1].weight) |
|
nn.init.zeros_(self.adaLN_modulation[1].bias) |
|
|
|
def forward(self, x, c): |
|
scale = self.adaLN_modulation(c) |
|
x = modulate(self.norm_final(x), scale) |
|
x = self.linear(x) |
|
return x |