import torch import torch.nn as nn import torch.nn.functional as F import numpy as np from typing import Callable, Optional import warnings try: from apex.normalization import FusedRMSNorm as RMSNorm except ImportError: warnings.warn("Cannot import apex RMSNorm, switch to vanilla implementation") class RMSNorm(torch.nn.Module): def __init__(self, dim: int, eps: float = 1e-6): """ Initialize the RMSNorm normalization layer. Args: dim (int): The dimension of the input tensor. eps (float, optional): A small value added to the denominator for numerical stability. Default is 1e-6. Attributes: eps (float): A small value added to the denominator for numerical stability. weight (nn.Parameter): Learnable scaling parameter. """ super().__init__() self.eps = eps self.weight = nn.Parameter(torch.ones(dim)) def _norm(self, x): """ Apply the RMSNorm normalization to the input tensor. Args: x (torch.Tensor): The input tensor. Returns: torch.Tensor: The normalized tensor. """ return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps) def forward(self, x): """ Forward pass through the RMSNorm layer. Args: x (torch.Tensor): The input tensor. Returns: torch.Tensor: The output tensor after applying RMSNorm. """ output = self._norm(x.float()).type_as(x) return output * self.weight def modulate(x, scale): return x * (1 + scale.unsqueeze(1)) class LLamaFeedForward(nn.Module): """ Corresponds to the FeedForward layer in Next DiT. """ def __init__( self, dim: int, hidden_dim: int, multiple_of: int, ffn_dim_multiplier: Optional[float] = None, zeros_initialize: bool = True, dtype: torch.dtype = torch.float32, ): super().__init__() self.dim = dim self.hidden_dim = hidden_dim self.multiple_of = multiple_of self.ffn_dim_multiplier = ffn_dim_multiplier self.zeros_initialize = zeros_initialize self.dtype = dtype # Compute hidden_dim based on the given formula hidden_dim_calculated = int(2 * self.hidden_dim / 3) if self.ffn_dim_multiplier is not None: hidden_dim_calculated = int(self.ffn_dim_multiplier * hidden_dim_calculated) hidden_dim_calculated = self.multiple_of * ((hidden_dim_calculated + self.multiple_of - 1) // self.multiple_of) # Define linear layers self.w1 = nn.Linear(self.dim, hidden_dim_calculated, bias=False) self.w2 = nn.Linear(hidden_dim_calculated, self.dim, bias=False) self.w3 = nn.Linear(self.dim, hidden_dim_calculated, bias=False) # Initialize weights if self.zeros_initialize: nn.init.zeros_(self.w2.weight) else: nn.init.xavier_uniform_(self.w2.weight) nn.init.xavier_uniform_(self.w1.weight) nn.init.xavier_uniform_(self.w3.weight) def _forward_silu_gating(self, x1, x3): return F.silu(x1) * x3 def forward(self, x): return self.w2(self._forward_silu_gating(self.w1(x), self.w3(x))) class FinalLayer(nn.Module): """ The final layer of Next-DiT. """ def __init__(self, hidden_size: int, patch_size: int, out_channels: int): super().__init__() self.hidden_size = hidden_size self.patch_size = patch_size self.out_channels = out_channels # LayerNorm without learnable parameters (elementwise_affine=False) self.norm_final = nn.LayerNorm(self.hidden_size, eps=1e-6, elementwise_affine=False) self.linear = nn.Linear(self.hidden_size, np.prod(self.patch_size) * self.out_channels, bias=True) nn.init.zeros_(self.linear.weight) nn.init.zeros_(self.linear.bias) self.adaLN_modulation = nn.Sequential( nn.SiLU(), nn.Linear(self.hidden_size, self.hidden_size), ) # Initialize the last layer with zeros nn.init.zeros_(self.adaLN_modulation[1].weight) nn.init.zeros_(self.adaLN_modulation[1].bias) def forward(self, x, c): scale = self.adaLN_modulation(c) x = modulate(self.norm_final(x), scale) x = self.linear(x) return x