from .attention import Attention
from .tiler import TileWorker
from einops import repeat, rearrange
import math
import torch


class HunyuanDiTRotaryEmbedding(torch.nn.Module):

    def __init__(self, q_norm_shape=88, k_norm_shape=88, rotary_emb_on_k=True):
        super().__init__()
        self.q_norm = torch.nn.LayerNorm((q_norm_shape,), elementwise_affine=True, eps=1e-06)
        self.k_norm = torch.nn.LayerNorm((k_norm_shape,), elementwise_affine=True, eps=1e-06)
        self.rotary_emb_on_k = rotary_emb_on_k
        self.k_cache, self.v_cache = [], []

    def reshape_for_broadcast(self, freqs_cis, x):
        ndim = x.ndim
        shape = [d if i == ndim - 2 or i == ndim - 1 else 1 for i, d in enumerate(x.shape)]
        return freqs_cis[0].view(*shape), freqs_cis[1].view(*shape)

    def rotate_half(self, x):
        x_real, x_imag = x.float().reshape(*x.shape[:-1], -1, 2).unbind(-1)
        return torch.stack([-x_imag, x_real], dim=-1).flatten(3)

    def apply_rotary_emb(self, xq, xk, freqs_cis):
        xk_out = None
        cos, sin = self.reshape_for_broadcast(freqs_cis, xq)
        cos, sin = cos.to(xq.device), sin.to(xq.device)
        xq_out = (xq.float() * cos + self.rotate_half(xq.float()) * sin).type_as(xq)
        if xk is not None:
            xk_out = (xk.float() * cos + self.rotate_half(xk.float()) * sin).type_as(xk)
        return xq_out, xk_out

    def forward(self, q, k, v, freqs_cis_img, to_cache=False):
        # norm
        q = self.q_norm(q)
        k = self.k_norm(k)

        # RoPE
        if self.rotary_emb_on_k:
            q, k = self.apply_rotary_emb(q, k, freqs_cis_img)
        else:
            q, _ = self.apply_rotary_emb(q, None, freqs_cis_img)
        
        if to_cache:
            self.k_cache.append(k)
            self.v_cache.append(v)
        elif len(self.k_cache) > 0 and len(self.v_cache) > 0:
            k = torch.concat([k] + self.k_cache, dim=2)
            v = torch.concat([v] + self.v_cache, dim=2)
            self.k_cache, self.v_cache = [], []
        return q, k, v


class FP32_Layernorm(torch.nn.LayerNorm):
    def forward(self, inputs):
        origin_dtype = inputs.dtype
        return torch.nn.functional.layer_norm(inputs.float(), self.normalized_shape, self.weight.float(), self.bias.float(), self.eps).to(origin_dtype)


class FP32_SiLU(torch.nn.SiLU):
    def forward(self, inputs):
        origin_dtype = inputs.dtype
        return torch.nn.functional.silu(inputs.float(), inplace=False).to(origin_dtype)
    

class HunyuanDiTFinalLayer(torch.nn.Module):
    def __init__(self, final_hidden_size=1408, condition_dim=1408, patch_size=2, out_channels=8):
        super().__init__()
        self.norm_final = torch.nn.LayerNorm(final_hidden_size, elementwise_affine=False, eps=1e-6)
        self.linear = torch.nn.Linear(final_hidden_size, patch_size * patch_size * out_channels, bias=True)
        self.adaLN_modulation = torch.nn.Sequential(
            FP32_SiLU(),
            torch.nn.Linear(condition_dim, 2 * final_hidden_size, bias=True)
        )

    def modulate(self, x, shift, scale):
        return x * (1 + scale.unsqueeze(1)) + shift.unsqueeze(1)

    def forward(self, hidden_states, condition_emb):
        shift, scale = self.adaLN_modulation(condition_emb).chunk(2, dim=1)
        hidden_states = self.modulate(self.norm_final(hidden_states), shift, scale)
        hidden_states = self.linear(hidden_states)
        return hidden_states


class HunyuanDiTBlock(torch.nn.Module):

    def __init__(
        self,
        hidden_dim=1408,
        condition_dim=1408,
        num_heads=16,
        mlp_ratio=4.3637,
        text_dim=1024,
        skip_connection=False
    ):
        super().__init__()
        self.norm1 = FP32_Layernorm((hidden_dim,), eps=1e-6, elementwise_affine=True)
        self.rota1 = HunyuanDiTRotaryEmbedding(hidden_dim//num_heads, hidden_dim//num_heads)
        self.attn1 = Attention(hidden_dim, num_heads, hidden_dim//num_heads, bias_q=True, bias_kv=True, bias_out=True)
        self.norm2 = FP32_Layernorm((hidden_dim,), eps=1e-6, elementwise_affine=True)
        self.rota2 = HunyuanDiTRotaryEmbedding(hidden_dim//num_heads, hidden_dim//num_heads, rotary_emb_on_k=False)
        self.attn2 = Attention(hidden_dim, num_heads, hidden_dim//num_heads, kv_dim=text_dim, bias_q=True, bias_kv=True, bias_out=True)
        self.norm3 = FP32_Layernorm((hidden_dim,), eps=1e-6, elementwise_affine=True)
        self.modulation = torch.nn.Sequential(FP32_SiLU(), torch.nn.Linear(condition_dim, hidden_dim, bias=True))
        self.mlp = torch.nn.Sequential(
            torch.nn.Linear(hidden_dim, int(hidden_dim*mlp_ratio), bias=True),
            torch.nn.GELU(approximate="tanh"),
            torch.nn.Linear(int(hidden_dim*mlp_ratio), hidden_dim, bias=True)
        )
        if skip_connection:
            self.skip_norm = FP32_Layernorm((hidden_dim * 2,), eps=1e-6, elementwise_affine=True)
            self.skip_linear = torch.nn.Linear(hidden_dim * 2, hidden_dim, bias=True)
        else:
            self.skip_norm, self.skip_linear = None, None

    def forward(self, hidden_states, condition_emb, text_emb, freq_cis_img, residual=None, to_cache=False):
        # Long Skip Connection
        if self.skip_norm is not None and self.skip_linear is not None:
            hidden_states = torch.cat([hidden_states, residual], dim=-1)
            hidden_states = self.skip_norm(hidden_states)
            hidden_states = self.skip_linear(hidden_states)

        # Self-Attention
        shift_msa = self.modulation(condition_emb).unsqueeze(dim=1)
        attn_input = self.norm1(hidden_states) + shift_msa
        hidden_states = hidden_states + self.attn1(attn_input, qkv_preprocessor=lambda q, k, v: self.rota1(q, k, v, freq_cis_img, to_cache=to_cache))

        # Cross-Attention
        attn_input = self.norm3(hidden_states)
        hidden_states = hidden_states + self.attn2(attn_input, text_emb, qkv_preprocessor=lambda q, k, v: self.rota2(q, k, v, freq_cis_img))

        # FFN Layer
        mlp_input = self.norm2(hidden_states)
        hidden_states = hidden_states + self.mlp(mlp_input)
        return hidden_states
    

class AttentionPool(torch.nn.Module):
    def __init__(self, spacial_dim, embed_dim, num_heads, output_dim = None):
        super().__init__()
        self.positional_embedding = torch.nn.Parameter(torch.randn(spacial_dim + 1, embed_dim) / embed_dim ** 0.5)
        self.k_proj = torch.nn.Linear(embed_dim, embed_dim)
        self.q_proj = torch.nn.Linear(embed_dim, embed_dim)
        self.v_proj = torch.nn.Linear(embed_dim, embed_dim)
        self.c_proj = torch.nn.Linear(embed_dim, output_dim or embed_dim)
        self.num_heads = num_heads

    def forward(self, x):
        x = x.permute(1, 0, 2)  # NLC -> LNC
        x = torch.cat([x.mean(dim=0, keepdim=True), x], dim=0)  # (L+1)NC
        x = x + self.positional_embedding[:, None, :].to(x.dtype)  # (L+1)NC
        x, _ = torch.nn.functional.multi_head_attention_forward(
            query=x[:1], key=x, value=x,
            embed_dim_to_check=x.shape[-1],
            num_heads=self.num_heads,
            q_proj_weight=self.q_proj.weight,
            k_proj_weight=self.k_proj.weight,
            v_proj_weight=self.v_proj.weight,
            in_proj_weight=None,
            in_proj_bias=torch.cat([self.q_proj.bias, self.k_proj.bias, self.v_proj.bias]),
            bias_k=None,
            bias_v=None,
            add_zero_attn=False,
            dropout_p=0,
            out_proj_weight=self.c_proj.weight,
            out_proj_bias=self.c_proj.bias,
            use_separate_proj_weight=True,
            training=self.training,
            need_weights=False
        )
        return x.squeeze(0)
    

class PatchEmbed(torch.nn.Module):
    def __init__(
        self,
        patch_size=(2, 2),
        in_chans=4,
        embed_dim=1408,
        bias=True,
    ):
        super().__init__()
        self.proj = torch.nn.Conv2d(in_chans, embed_dim, kernel_size=patch_size, stride=patch_size, bias=bias)

    def forward(self, x):
        x = self.proj(x)
        x = x.flatten(2).transpose(1, 2)  # BCHW -> BNC
        return x
    

def timestep_embedding(t, dim, max_period=10000, repeat_only=False):
    # https://github.com/openai/glide-text2im/blob/main/glide_text2im/nn.py
    if not repeat_only:
        half = dim // 2
        freqs = torch.exp(
            -math.log(max_period)
            * torch.arange(start=0, end=half, dtype=torch.float32)
            / half
        ).to(device=t.device)   # size: [dim/2], 一个指数衰减的曲线
        args = t[:, None].float() * freqs[None]
        embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1)
        if dim % 2:
            embedding = torch.cat(
                [embedding, torch.zeros_like(embedding[:, :1])], dim=-1
            )
    else:
        embedding = repeat(t, "b -> b d", d=dim)
    return embedding
    

class TimestepEmbedder(torch.nn.Module):
    def __init__(self, hidden_size=1408, frequency_embedding_size=256):
        super().__init__()
        self.mlp = torch.nn.Sequential(
            torch.nn.Linear(frequency_embedding_size, hidden_size, bias=True),
            torch.nn.SiLU(),
            torch.nn.Linear(hidden_size, hidden_size, bias=True),
        )
        self.frequency_embedding_size = frequency_embedding_size

    def forward(self, t):
        t_freq = timestep_embedding(t, self.frequency_embedding_size).type(self.mlp[0].weight.dtype)
        t_emb = self.mlp(t_freq)
        return t_emb


class HunyuanDiT(torch.nn.Module):
    def __init__(self, num_layers_down=21, num_layers_up=19, in_channels=4, out_channels=8, hidden_dim=1408, text_dim=1024, t5_dim=2048, text_length=77, t5_length=256):
        super().__init__()

        # Embedders
        self.text_emb_padding = torch.nn.Parameter(torch.randn(text_length + t5_length, text_dim, dtype=torch.float32))
        self.t5_embedder = torch.nn.Sequential(
            torch.nn.Linear(t5_dim, t5_dim * 4, bias=True),
            FP32_SiLU(),
            torch.nn.Linear(t5_dim * 4, text_dim, bias=True),
        )
        self.t5_pooler = AttentionPool(t5_length, t5_dim, num_heads=8, output_dim=1024)
        self.style_embedder = torch.nn.Parameter(torch.randn(hidden_dim))
        self.patch_embedder = PatchEmbed(in_chans=in_channels)
        self.timestep_embedder = TimestepEmbedder()
        self.extra_embedder = torch.nn.Sequential(
            torch.nn.Linear(256 * 6 + 1024 + hidden_dim, hidden_dim * 4),
            FP32_SiLU(),
            torch.nn.Linear(hidden_dim * 4, hidden_dim),
        )

        # Transformer blocks
        self.num_layers_down = num_layers_down
        self.num_layers_up = num_layers_up
        self.blocks = torch.nn.ModuleList(
            [HunyuanDiTBlock(skip_connection=False) for _ in range(num_layers_down)] + \
            [HunyuanDiTBlock(skip_connection=True) for _ in range(num_layers_up)]
        )

        # Output layers
        self.final_layer = HunyuanDiTFinalLayer()
        self.out_channels = out_channels

    def prepare_text_emb(self, text_emb, text_emb_t5, text_emb_mask, text_emb_mask_t5):
        text_emb_mask = text_emb_mask.bool()
        text_emb_mask_t5 = text_emb_mask_t5.bool()
        text_emb_t5 = self.t5_embedder(text_emb_t5)
        text_emb = torch.cat([text_emb, text_emb_t5], dim=1)
        text_emb_mask = torch.cat([text_emb_mask, text_emb_mask_t5], dim=-1)
        text_emb = torch.where(text_emb_mask.unsqueeze(2), text_emb, self.text_emb_padding.to(text_emb))
        return text_emb
    
    def prepare_extra_emb(self, text_emb_t5, timestep, size_emb, dtype, batch_size):
        # Text embedding
        pooled_text_emb_t5 = self.t5_pooler(text_emb_t5)

        # Timestep embedding
        timestep_emb = self.timestep_embedder(timestep)

        # Size embedding
        size_emb = timestep_embedding(size_emb.view(-1), 256).to(dtype)
        size_emb = size_emb.view(-1, 6 * 256)

        # Style embedding
        style_emb = repeat(self.style_embedder, "D -> B D", B=batch_size)

        # Concatenate all extra vectors
        extra_emb = torch.cat([pooled_text_emb_t5, size_emb, style_emb], dim=1)
        condition_emb = timestep_emb + self.extra_embedder(extra_emb)

        return condition_emb

    def unpatchify(self, x, h, w):
        return rearrange(x, "B (H W) (P Q C) -> B C (H P) (W Q)", H=h, W=w, P=2, Q=2)
    
    def build_mask(self, data, is_bound):
        _, _, H, W = data.shape
        h = repeat(torch.arange(H), "H -> H W", H=H, W=W)
        w = repeat(torch.arange(W), "W -> H W", H=H, W=W)
        border_width = (H + W) // 4
        pad = torch.ones_like(h) * border_width
        mask = torch.stack([
            pad if is_bound[0] else h + 1,
            pad if is_bound[1] else H - h,
            pad if is_bound[2] else w + 1,
            pad if is_bound[3] else W - w
        ]).min(dim=0).values
        mask = mask.clip(1, border_width)
        mask = (mask / border_width).to(dtype=data.dtype, device=data.device)
        mask = rearrange(mask, "H W -> 1 H W")
        return mask
    
    def tiled_block_forward(self, block, hidden_states, condition_emb, text_emb, freq_cis_img, residual, torch_dtype, data_device, computation_device, tile_size, tile_stride):
        B, C, H, W = hidden_states.shape

        weight = torch.zeros((1, 1, H, W), dtype=torch_dtype, device=data_device)
        values = torch.zeros((B, C, H, W), dtype=torch_dtype, device=data_device)

        # Split tasks
        tasks = []
        for h in range(0, H, tile_stride):
            for w in range(0, W, tile_stride):
                if (h-tile_stride >= 0 and h-tile_stride+tile_size >= H) or (w-tile_stride >= 0 and w-tile_stride+tile_size >= W):
                    continue
                h_, w_ = h + tile_size, w + tile_size
                if h_ > H: h, h_ = H - tile_size, H
                if w_ > W: w, w_ = W - tile_size, W
                tasks.append((h, h_, w, w_))

        # Run
        for hl, hr, wl, wr in tasks:
            hidden_states_batch = hidden_states[:, :, hl:hr, wl:wr].to(computation_device)
            hidden_states_batch = rearrange(hidden_states_batch, "B C H W -> B (H W) C")
            if residual is not None:
                residual_batch = residual[:, :, hl:hr, wl:wr].to(computation_device)
                residual_batch = rearrange(residual_batch, "B C H W -> B (H W) C")
            else:
                residual_batch = None

            # Forward
            hidden_states_batch = block(hidden_states_batch, condition_emb, text_emb, freq_cis_img, residual_batch).to(data_device)
            hidden_states_batch = rearrange(hidden_states_batch, "B (H W) C -> B C H W", H=hr-hl)

            mask = self.build_mask(hidden_states_batch, is_bound=(hl==0, hr>=H, wl==0, wr>=W))
            values[:, :, hl:hr, wl:wr] += hidden_states_batch * mask
            weight[:, :, hl:hr, wl:wr] += mask
        values /= weight
        return values

    def forward(
        self, hidden_states, text_emb, text_emb_t5, text_emb_mask, text_emb_mask_t5, timestep, size_emb, freq_cis_img,
        tiled=False, tile_size=64, tile_stride=32,
        to_cache=False,
        use_gradient_checkpointing=False,
    ):
        # Embeddings
        text_emb = self.prepare_text_emb(text_emb, text_emb_t5, text_emb_mask, text_emb_mask_t5)
        condition_emb = self.prepare_extra_emb(text_emb_t5, timestep, size_emb, hidden_states.dtype, hidden_states.shape[0])
        
        # Input
        height, width = hidden_states.shape[-2], hidden_states.shape[-1]
        hidden_states = self.patch_embedder(hidden_states)

        # Blocks
        def create_custom_forward(module):
            def custom_forward(*inputs):
                return module(*inputs)
            return custom_forward
        if tiled:
            hidden_states = rearrange(hidden_states, "B (H W) C -> B C H W", H=height//2)
            residuals = []
            for block_id, block in enumerate(self.blocks):
                residual = residuals.pop() if block_id >= self.num_layers_down else None
                hidden_states = self.tiled_block_forward(
                    block, hidden_states, condition_emb, text_emb, freq_cis_img, residual,
                    torch_dtype=hidden_states.dtype, data_device=hidden_states.device, computation_device=hidden_states.device,
                    tile_size=tile_size, tile_stride=tile_stride
                )
                if block_id < self.num_layers_down - 2:
                    residuals.append(hidden_states)
            hidden_states = rearrange(hidden_states, "B C H W -> B (H W) C")
        else:
            residuals = []
            for block_id, block in enumerate(self.blocks):
                residual = residuals.pop() if block_id >= self.num_layers_down else None
                if self.training and use_gradient_checkpointing:
                    hidden_states = torch.utils.checkpoint.checkpoint(
                        create_custom_forward(block),
                        hidden_states, condition_emb, text_emb, freq_cis_img, residual,
                        use_reentrant=False,
                    )
                else:
                    hidden_states = block(hidden_states, condition_emb, text_emb, freq_cis_img, residual, to_cache=to_cache)
                if block_id < self.num_layers_down - 2:
                    residuals.append(hidden_states)

        # Output
        hidden_states = self.final_layer(hidden_states, condition_emb)
        hidden_states = self.unpatchify(hidden_states, height//2, width//2)
        hidden_states, _ = hidden_states.chunk(2, dim=1)
        return hidden_states
    
    def state_dict_converter(self):
        return HunyuanDiTStateDictConverter()



class HunyuanDiTStateDictConverter():
    def __init__(self):
        pass

    def from_diffusers(self, state_dict):
        state_dict_ = {}
        for name, param in state_dict.items():
            name_ = name
            name_ = name_.replace(".default_modulation.", ".modulation.")
            name_ = name_.replace(".mlp.fc1.", ".mlp.0.")
            name_ = name_.replace(".mlp.fc2.", ".mlp.2.")
            name_ = name_.replace(".attn1.q_norm.", ".rota1.q_norm.")
            name_ = name_.replace(".attn2.q_norm.", ".rota2.q_norm.")
            name_ = name_.replace(".attn1.k_norm.", ".rota1.k_norm.")
            name_ = name_.replace(".attn2.k_norm.", ".rota2.k_norm.")
            name_ = name_.replace(".q_proj.", ".to_q.")
            name_ = name_.replace(".out_proj.", ".to_out.")
            name_ = name_.replace("text_embedding_padding", "text_emb_padding")
            name_ = name_.replace("mlp_t5.0.", "t5_embedder.0.")
            name_ = name_.replace("mlp_t5.2.", "t5_embedder.2.")
            name_ = name_.replace("pooler.", "t5_pooler.")
            name_ = name_.replace("x_embedder.", "patch_embedder.")
            name_ = name_.replace("t_embedder.", "timestep_embedder.")
            name_ = name_.replace("t5_pooler.to_q.", "t5_pooler.q_proj.")
            name_ = name_.replace("style_embedder.weight", "style_embedder")
            if ".kv_proj." in name_:
                param_k = param[:param.shape[0]//2]
                param_v = param[param.shape[0]//2:]
                state_dict_[name_.replace(".kv_proj.", ".to_k.")] = param_k
                state_dict_[name_.replace(".kv_proj.", ".to_v.")] = param_v
            elif ".Wqkv." in name_:
                param_q = param[:param.shape[0]//3]
                param_k = param[param.shape[0]//3:param.shape[0]//3*2]
                param_v = param[param.shape[0]//3*2:]
                state_dict_[name_.replace(".Wqkv.", ".to_q.")] = param_q
                state_dict_[name_.replace(".Wqkv.", ".to_k.")] = param_k
                state_dict_[name_.replace(".Wqkv.", ".to_v.")] = param_v
            elif "style_embedder" in name_:
                state_dict_[name_] = param.squeeze()
            else:
                state_dict_[name_] = param
        return state_dict_
    
    def from_civitai(self, state_dict):
        return self.from_diffusers(state_dict)