import os
from importlib import import_module
from typing import List, Callable, Union, Optional

import PIL.Image
import torch
import torch.nn.functional as F
from torch import LongTensor, IntTensor, Tensor
from transformers import CLIPImageProcessor, CLIPVisionModel, SiglipImageProcessor, SiglipVisionModel
from transformers import PreTrainedModel, AutoModel, AutoTokenizer, AutoModelForCausalLM, AutoImageProcessor
from transformers.generation.utils import GenerateOutput

from .configuration_ovis import BaseVisualTokenizerConfig, ClipVisualTokenizerConfig, SiglipVisualTokenizerConfig
from .configuration_ovis import OvisConfig, ConversationFormatter, IGNORE_INDEX, IMAGE_TOKEN_INDEX


# ----------------------------------------------------------------------
#                            Visual Tokenizer
# ----------------------------------------------------------------------
class BaseVisualTokenizer(PreTrainedModel):
    base_model_prefix = "backbone"
    main_input_name = None
    _image_processor_class = None
    _image_processor_kwargs = {}
    _backbone_class = None
    _backbone_name_or_path = None

    def __init__(self, config: BaseVisualTokenizerConfig, *inputs, **kwargs):
        super().__init__(config, *inputs, **kwargs)
        if kwargs.get('train_from_scratch'):
            self.image_processor = self._image_processor_class.from_pretrained(
                self._backbone_name_or_path, **self._image_processor_kwargs)
            self.backbone = self._backbone_class.from_pretrained(
                self._backbone_name_or_path, **self.config.backbone_kwargs)
            self.config.backbone_config = self.backbone.config
        else:
            self.image_processor = AutoImageProcessor.from_pretrained(
                kwargs['image_processor_name_or_path'])
            self.backbone = AutoModel.from_config(self.config.backbone_config)
        self.head = None

        assert all((self.image_processor.do_resize,
                    not getattr(self.image_processor, 'do_center_crop', False),
                    self.image_processor.do_rescale,
                    self.image_processor.do_normalize
                    )), f"image_processor `{self.image_processor}` is not supported currently"

    def get_backbone(self):
        return self.backbone

    def get_image_processor(self):
        return self.image_processor

    def get_zero_pixel_values(self, n=1):
        height, width = self.get_image_size()
        if self.config.hd_booster is None:
            return torch.zeros(n, 3, height, width)
        elif self.config.hd_booster in ['s2wrapper', 's2wrapper-adaptive']:
            return torch.zeros(n, 3 * 5, height, width)
        else:
            raise ValueError(f'Unsupported hd_booster {self.config.hd_booster}')

    def get_head(self):
        return self.head

    def get_image_size(self):
        raise NotImplementedError

    def preprocess_image(self, image: PIL.Image.Image, convert_to_rgb=True):
        def _preprocess(img: PIL.Image.Image):
            # first resize and preprocess
            sides = self.get_image_size()
            if sides[0] != sides[1]:
                raise ValueError('get_image_size() returns non-square size')
            side = sides[0]

            w, h = img.size
            if w == h:
                new_width = new_height = side
            elif w > h:
                new_width = side
                new_height = int(h / w * new_width)
            else:
                new_height = side
                new_width = int(w / h * new_height)
            new_size = dict(height=new_height, width=new_width)
            pixel_values = self.image_processor.preprocess(
                img, size=new_size, return_tensors='pt')['pixel_values']

            # then pad to square
            square_values = torch.zeros(
                [1, 3, side, side], dtype=pixel_values.dtype, device=pixel_values.device)
            new_height, new_width = pixel_values.shape[2:]
            if new_height == new_width:
                square_values[:, :, :, :] = pixel_values
            elif new_height > new_width:
                from_index = (side - new_width) // 2
                square_values[:, :, :, from_index:from_index + new_width] = pixel_values
            else:
                from_index = (side - new_height) // 2
                square_values[:, :, from_index:from_index + new_height, :] = pixel_values

            return square_values

        if convert_to_rgb and image.mode != 'RGB':
            image = image.convert('RGB')

        if self.config.hd_booster is None:
            return _preprocess(image)  # [1, 3, side, side]
        elif self.config.hd_booster in ['s2wrapper', 's2wrapper-adaptive']:
            width, height = image.size
            is_low_resolution = (height < self.get_image_size()[0] * 1.5 or
                                 width < self.get_image_size()[1] * 1.5)
            if self.config.hd_booster == 's2wrapper-adaptive' and is_low_resolution:
                values = self.get_zero_pixel_values() + torch.inf
                values[0][:3] = _preprocess(image)[0]
            else:
                center_x, center_y = width // 2, height // 2
                image_top_left = image.crop((0, 0, center_x, center_y))
                image_top_right = image.crop((center_x, 0, width, center_y))
                image_bottom_left = image.crop((0, center_y, center_x, height))
                image_bottom_right = image.crop((center_x, center_y, width, height))
                imgs = [image, image_top_left, image_top_right, image_bottom_left, image_bottom_right]
                values = torch.cat([_preprocess(img) for img in imgs], dim=1)
            return values  # [1, 3*5, side, side]
        else:
            raise ValueError(f'Unsupported hd_booster {self.config.hd_booster}')

    def get_backbone_layer(self, index):
        return self.backbone.vision_model.encoder.layers[index]

    def tokenize(self, logits):
        def st_argmax(y_soft, dim):  # straight-through softmax
            index = y_soft.max(dim, keepdim=True)[1]
            y_hard = torch.zeros_like(
                y_soft, memory_format=torch.legacy_contiguous_format).scatter_(dim, index, 1.0)
            ret = y_hard - y_soft.detach() + y_soft
            return ret

        if self.config.tokenize_function == 'softmax':
            tokens = F.softmax(logits, dim=-1)
        elif self.config.tokenize_function == 'gumbel_argmax':
            tokens = F.gumbel_softmax(logits, tau=self.config.tau, hard=True)
        elif self.config.tokenize_function == 'st_argmax':
            tokens = st_argmax(logits, dim=-1)
        else:
            raise ValueError(
                f'Invalid `max_type`, expected softmax or gumbel_argmax or st_argmax,'
                f' but got {self.config.tokenize_function}')
        return tokens


class ClipVisualTokenizer(BaseVisualTokenizer):
    config_class = ClipVisualTokenizerConfig
    supports_gradient_checkpointing = True
    _no_split_modules = ["CLIPEncoderLayer"]
    _image_processor_class = CLIPImageProcessor
    _image_processor_kwargs = dict(do_center_crop=False)
    _backbone_class = CLIPVisionModel
    _backbone_name_or_path = "openai/clip-vit-large-patch14-336"

    def __init__(self, config: ClipVisualTokenizerConfig = None, *inputs, **kwargs):
        super().__init__(config, *inputs, **kwargs)
        head_dim = self.config.vocab_size
        if self.config.use_indicators:
            head_dim -= 2  # reserved for two image indicator tokens
        if self.config.hd_booster is None:
            self.head = torch.nn.Sequential(
                torch.nn.Linear(self.backbone.config.hidden_size, head_dim, bias=False),
                torch.nn.LayerNorm(head_dim)
            )
        elif self.config.hd_booster in ['s2wrapper', 's2wrapper-adaptive']:
            self.head = torch.nn.Sequential(
                torch.nn.Linear(self.backbone.config.hidden_size * 2, head_dim, bias=False),
                torch.nn.LayerNorm(head_dim)
            )
        else:
            raise ValueError(f'Unsupported hd_booster {self.config.hd_booster}')

    def get_image_size(self):
        height = self.image_processor.crop_size["height"]
        width = self.image_processor.crop_size["width"]
        return height, width

    def encode(self, pixel_values):
        if self.config.hd_booster is None:
            output = self.backbone(pixel_values, output_hidden_states=True, return_dict=True)
            features = output.hidden_states[-1]
            if self.config.drop_cls_token:
                features = features[:, 1:, :]
        elif self.config.hd_booster in ['s2wrapper', 's2wrapper-adaptive']:
            n, c, side, _ = pixel_values.shape
            if self.config.hd_booster == 's2wrapper-adaptive':
                pixel_values_mask = torch.isinf(pixel_values)  # [n, c, side, side]
                pixel_values = torch.masked_fill(pixel_values, pixel_values_mask, 0.0)
            pixel_values = pixel_values.reshape(n * 5, c // 5, side, side)
            output = self.backbone(pixel_values, output_hidden_states=True, return_dict=True)
            features = output.hidden_states[-1]
            if self.config.drop_cls_token:
                features = features[:, 1:, :]
            _, l, d = features.shape
            features = features.reshape(n, 5, l, d)
            features_overall = features[:, 0, :, :]  # [n, l, d]
            features_parts = features[:, 1:, :, :]  # [n, 4, l, d]
            sqrt_l = int(l ** 0.5)
            assert sqrt_l ** 2 == l, "The token sequence length should be a perfect square."
            features_parts = features_parts.reshape(n, 4, sqrt_l, sqrt_l, d)  # [n, 4, sqrt(l), sqrt(l), d]
            features_top = torch.concat(
                [features_parts[:, 0, :, :, :], features_parts[:, 1, :, :, :]], dim=-2)  # [n, sqrt(l), sqrt(l)*2, d]
            features_bottom = torch.concat(
                [features_parts[:, 2, :, :, :], features_parts[:, 3, :, :, :]], dim=-2)  # [n, sqrt(l), sqrt(l)*2, d]
            features_merge = torch.concat([features_top, features_bottom], dim=-3)  # [n, sqrt(l)*2, sqrt(l)*2, d]
            features_pool = F.interpolate(
                features_merge.permute(0, 3, 1, 2).to(torch.float32),
                size=sqrt_l,
                mode='area'
            )  # [n, d, sqrt_l, sqrt_l]
            features_pool = features_pool.flatten(2).permute(0, 2, 1).to(features.dtype)  # [n, l, d]
            if self.config.hd_booster == 's2wrapper-adaptive':
                features_pool_mask = torch.unsqueeze(
                    torch.unsqueeze(pixel_values_mask[:, -1, -1, -1], dim=-1), dim=-1)  # [n, 1, 1]
                features_pool = torch.masked_fill(features_pool, features_pool_mask, 0.0)
            features = torch.cat([features_overall, features_pool], dim=-1)  # [n, l, 2*d]
        else:
            raise ValueError(f'Unsupported hd_booster {self.config.hd_booster}')
        return features

    def forward(self, pixel_values) -> Tensor:  # [BatchSize, ImageShape] -> [BatchSize, #Token, VocabSize]
        features = self.encode(pixel_values)
        logits = self.head(features)
        tokens = self.tokenize(logits)
        if self.config.use_indicators:
            # tokens' shape is [BatchSize, #Token, VocabSize-2], so padding with [BatchSize, #Token, 2],
            # after which, tokens' shape should become [BatchSize, #Token, VocabSize]
            batch_size, token_len, _ = tokens.shape
            padding_tensor = torch.zeros(
                size=(batch_size, token_len, 2),
                dtype=tokens.dtype,
                device=tokens.device,
                layout=tokens.layout,
                requires_grad=False
            )
            tokens = torch.cat((tokens, padding_tensor), dim=2)

            # adding indicator tokens, after which tokens' shape should become [BatchSize, 1+#Token+1, VocabSize]
            begin_indicator = torch.zeros(
                size=(batch_size, 1),
                dtype=torch.long,
                device=tokens.device,
                requires_grad=False
            ) + self.config.vocab_size - 2
            begin_indicator_token = F.one_hot(
                begin_indicator, num_classes=self.config.vocab_size).to(dtype=tokens.dtype)
            end_indicator = torch.zeros(
                size=(batch_size, 1),
                dtype=torch.long,
                device=tokens.device,
                requires_grad=False
            ) + self.config.vocab_size - 1
            end_indicator_token = F.one_hot(
                end_indicator, num_classes=self.config.vocab_size).to(dtype=tokens.dtype)
            tokens = torch.cat((begin_indicator_token, tokens, end_indicator_token), dim=1)
        return tokens


class SiglipVisualTokenizer(BaseVisualTokenizer):
    config_class = SiglipVisualTokenizerConfig
    supports_gradient_checkpointing = True
    _no_split_modules = ["SiglipVisionTransformer"]
    _image_processor_class = SiglipImageProcessor
    _image_processor_kwargs = {}
    _backbone_class = SiglipVisionModel
    _backbone_name_or_path = "google/siglip-so400m-patch14-384"

    def __init__(self, config: SiglipVisualTokenizerConfig = None, *inputs, **kwargs):
        super().__init__(config, *inputs, **kwargs)
        head_dim = self.config.vocab_size
        if self.config.use_indicators:
            head_dim -= 2  # reserved for two image indicator tokens
        if self.config.hd_booster is None:
            self.head = torch.nn.Sequential(
                torch.nn.Linear(
                    self.backbone.config.hidden_size * self.config.hidden_stride * self.config.hidden_stride,
                    head_dim,
                    bias=False
                ),
                torch.nn.LayerNorm(head_dim)
            )
        elif self.config.hd_booster in ['s2wrapper', 's2wrapper-adaptive']:
            self.head = torch.nn.Sequential(
                torch.nn.Linear(
                    self.backbone.config.hidden_size * self.config.hidden_stride * self.config.hidden_stride * 2,
                    head_dim,
                    bias=False
                ),
                torch.nn.LayerNorm(head_dim)
            )
        else:
            raise ValueError(f'Unsupported hd_booster {self.config.hd_booster}')

    def get_image_size(self):
        height = self.image_processor.size["height"]
        width = self.image_processor.size["width"]
        return height, width

    def encode(self, pixel_values):
        if self.config.hd_booster is None:
            output = self.backbone(pixel_values, output_hidden_states=True, return_dict=True)
            features = output.hidden_states[-1]
            if self.config.drop_cls_token:
                features = features[:, 1:, :]
        elif self.config.hd_booster in ['s2wrapper', 's2wrapper-adaptive']:
            n, c, side, _ = pixel_values.shape
            if self.config.hd_booster == 's2wrapper-adaptive':
                pixel_values_mask = torch.isinf(pixel_values)  # [n, c, side, side]
                pixel_values = torch.masked_fill(pixel_values, pixel_values_mask, 0.0)
            pixel_values = pixel_values.reshape(n * 5, c // 5, side, side)
            output = self.backbone(pixel_values, output_hidden_states=True, return_dict=True)
            features = output.hidden_states[-1]
            if self.config.drop_cls_token:
                features = features[:, 1:, :]
            _, l, d = features.shape
            features = features.reshape(n, 5, l, d)
            features_overall = features[:, 0, :, :]  # [n, l, d]
            features_parts = features[:, 1:, :, :]  # [n, 4, l, d]
            sqrt_l = int(l ** 0.5)
            assert sqrt_l ** 2 == l, "The token sequence length should be a perfect square."
            features_parts = features_parts.reshape(n, 4, sqrt_l, sqrt_l, d)  # [n, 4, sqrt(l), sqrt(l), d]
            features_top = torch.concat(
                [features_parts[:, 0, :, :, :], features_parts[:, 1, :, :, :]], dim=-2)  # [n, sqrt(l), sqrt(l)*2, d]
            features_bottom = torch.concat(
                [features_parts[:, 2, :, :, :], features_parts[:, 3, :, :, :]], dim=-2)  # [n, sqrt(l), sqrt(l)*2, d]
            features_merge = torch.concat([features_top, features_bottom], dim=-3)  # [n, sqrt(l)*2, sqrt(l)*2, d]
            features_pool = F.interpolate(
                features_merge.permute(0, 3, 1, 2).to(torch.float32),
                size=sqrt_l,
                mode='area'
            )  # [n, d, sqrt_l, sqrt_l]
            features_pool = features_pool.flatten(2).permute(0, 2, 1).to(features.dtype)  # [n, l, d]
            if self.config.hd_booster == 's2wrapper-adaptive':
                features_pool_mask = torch.unsqueeze(
                    torch.unsqueeze(pixel_values_mask[:, -1, -1, -1], dim=-1), dim=-1)  # [n, 1, 1]
                features_pool = torch.masked_fill(features_pool, features_pool_mask, 0.0)
            features = torch.cat([features_overall, features_pool], dim=-1)  # [n, l, 2*d]
        else:
            raise ValueError(f'Unsupported hd_booster {self.config.hd_booster}')

        # merge number of `hidden_stride * hidden_stride` hidden states together to reduce token sequence length
        # e.g., for hidden_stride=3, this leads to a token length reduction: 729 -> 81
        if self.config.hidden_stride > 1:
            n, l, d = features.shape  # this `d` maybe different from the above `d
            sqrt_l = int(l ** 0.5)
            assert sqrt_l ** 2 == l, "The token sequence length should be a perfect square."
            assert l % (self.config.hidden_stride ** 2) == 0, \
                "The token sequence length should be divisible by `hidden_stride**2`."
            features = features.reshape(n, sqrt_l, sqrt_l, d)
            features = features.reshape(n, sqrt_l // self.config.hidden_stride, self.config.hidden_stride,
                                        sqrt_l // self.config.hidden_stride, self.config.hidden_stride, d)
            features = features.permute(0, 1, 3, 2, 4, 5)  # [n, sqrt_l/hs, sqrt_l/hs, hs, hs, d]
            features = features.flatten(3)  # [n, sqrt_l/hs, sqrt_l/hs, hs*hs*d]
            features = features.reshape(n, l // (self.config.hidden_stride * self.config.hidden_stride),
                                        self.config.hidden_stride * self.config.hidden_stride * d)

        return features

    def forward(self, pixel_values) -> Tensor:  # [BatchSize, ImageShape] -> [BatchSize, #Token, VocabSize]
        features = self.encode(pixel_values)
        logits = self.head(features)
        tokens = self.tokenize(logits)
        if self.config.use_indicators:
            # tokens' shape is [BatchSize, #Token, VocabSize-2], so padding with [BatchSize, #Token, 2], after
            # which, tokens' shape should become [BatchSize, #Token, VocabSize]
            batch_size, token_len, _ = tokens.shape
            padding_tensor = torch.zeros(
                size=(batch_size, token_len, 2),
                dtype=tokens.dtype,
                device=tokens.device,
                layout=tokens.layout,
                requires_grad=False
            )
            tokens = torch.cat((tokens, padding_tensor), dim=2)

            # adding indicator tokens, after which tokens' shape should become [BatchSize, 1+#Token+1, VocabSize]
            begin_indicator = torch.zeros(
                size=(batch_size, 1),
                dtype=torch.long,
                device=tokens.device,
                requires_grad=False
            ) + self.config.vocab_size - 2
            begin_indicator_token = F.one_hot(
                begin_indicator, num_classes=self.config.vocab_size).to(dtype=tokens.dtype)
            end_indicator = torch.zeros(
                size=(batch_size, 1),
                dtype=torch.long,
                device=tokens.device,
                requires_grad=False
            ) + self.config.vocab_size - 1
            end_indicator_token = F.one_hot(
                end_indicator, num_classes=self.config.vocab_size).to(dtype=tokens.dtype)
            tokens = torch.cat((begin_indicator_token, tokens, end_indicator_token), dim=1)
        return tokens


AutoModel.register(ClipVisualTokenizerConfig, ClipVisualTokenizer)
AutoModel.register(SiglipVisualTokenizerConfig, SiglipVisualTokenizer)


# ----------------------------------------------------------------------
#                                  Ovis
# ----------------------------------------------------------------------
class VisualEmbedding(torch.nn.Embedding):
    def forward(self, input: Tensor) -> Tensor:
        if any((isinstance(input, LongTensor), isinstance(input, IntTensor))):
            return super().forward(input)
        return torch.matmul(input, self.weight)


class OvisPreTrainedModel(PreTrainedModel):
    config_class = OvisConfig
    base_model_prefix = "ovis"


class Ovis(OvisPreTrainedModel):

    def __init__(self, config: OvisConfig, *inputs, **kwargs):
        super().__init__(config, *inputs, **kwargs)
        self.llm = AutoModelForCausalLM.from_config(self.config.llm_config)
        assert self.config.hidden_size == self.llm.config.hidden_size, "hidden size mismatch"
        self.text_tokenizer = AutoTokenizer.from_pretrained(self.config.name_or_path)
        self.visual_tokenizer = AutoModel.from_config(
            self.config.visual_tokenizer_config,
            image_processor_name_or_path=self.config.name_or_path
        )
        self.vte = VisualEmbedding(
            self.config.visual_tokenizer_config.vocab_size,
            self.config.hidden_size,
            device=self.visual_tokenizer.device,
            dtype=self.visual_tokenizer.dtype
        )

        def _merge_modules(modules_list: tuple):
            merged_modules = []
            for modules in modules_list:
                merged_modules.extend(modules if modules else [])
            return merged_modules

        self._no_split_modules = _merge_modules(
            (self.llm._no_split_modules, self.visual_tokenizer._no_split_modules))
        self._skip_keys_device_placement = self.llm._skip_keys_device_placement
        self._keep_in_fp32_modules = _merge_modules(
            (self.llm._keep_in_fp32_modules, self.visual_tokenizer._keep_in_fp32_modules))
        self.is_parallelizable = all((self.llm.is_parallelizable, self.visual_tokenizer.is_parallelizable))
        self.supports_gradient_checkpointing = all(
            (self.llm.supports_gradient_checkpointing, self.visual_tokenizer.supports_gradient_checkpointing))
        self._supports_flash_attn_2 = all(
            (self.llm._supports_flash_attn_2, self.visual_tokenizer._supports_flash_attn_2))
        self._supports_sdpa = all((self.llm._supports_sdpa, self.visual_tokenizer._supports_sdpa))

    def get_text_tokenizer(self):
        return self.text_tokenizer

    def get_visual_tokenizer(self):
        return self.visual_tokenizer

    def get_llm(self):
        return self.llm

    def get_vte(self):
        return self.vte

    def get_wte(self):
        return self.llm.get_input_embeddings()

    def get_conversation_formatter(self) -> ConversationFormatter:
        if getattr(self, 'conversation_formatter', None) is None:
            self.conversation_formatter = getattr(
                import_module(".configuration_ovis", __package__),
                self.config.conversation_formatter_class
            )(self.text_tokenizer)
        return self.conversation_formatter

    def forward(
        self,
        input_ids: torch.Tensor,
        attention_mask: torch.Tensor,
        labels: Optional[torch.Tensor],
        pixel_values: List[Optional[torch.Tensor]],
        **kwargs
    ):
        assert self.training, "`forward` can only be used in training. For inference, use `generate`."
        _, inputs_embeds, labels, attention_mask = self.merge_multimodal(
            text_input_ids=input_ids,
            text_attention_masks=attention_mask,
            text_labels=labels,
            pixel_values=pixel_values
        )
        return self.llm(inputs_embeds=inputs_embeds, labels=labels, attention_mask=attention_mask, **kwargs)

    def merge_multimodal(
        self,
        text_input_ids: torch.Tensor,
        text_attention_masks: torch.Tensor,
        text_labels: Optional[torch.Tensor],
        pixel_values: List[Optional[torch.Tensor]]
    ):
        input_device = text_input_ids.device
        if self.training:
            # When training, to be compatible with deepspeed zero, each sample has to include pixel_value tensor.
            # For text-only sample, one can simply use a full zero tensor as pixel_value, which will be ignored
            # (see below in this function); so, the gradient will not be affected.
            num_images = [x.shape[0] for x in pixel_values]
            visual_tokens = self.visual_tokenizer(torch.cat([x for x in pixel_values], dim=0))
            visual_embeds = torch.split(
                self.get_vte()(visual_tokens).to(dtype=self.dtype, device=input_device),
                split_size_or_sections=num_images,
                dim=0
            )
            visual_input_ids = torch.split(
                torch.argmax(visual_tokens, dim=-1).to(device=input_device),
                split_size_or_sections=num_images,
                dim=0
            )
            visual_labels = [
                torch.full(
                    x.shape, IGNORE_INDEX, dtype=torch.long, device=input_device
                ) for x in visual_input_ids
            ]
        else:
            # When inference, sample can include only text with `None` pixel_value
            num_images = [x.shape[0] if x is not None else 0 for x in pixel_values]
            if sum(num_images) > 0:
                visual_tokens = self.visual_tokenizer(torch.cat([x for x in pixel_values if x is not None], dim=0))
                visual_embeds = torch.split(
                    self.get_vte()(visual_tokens).to(dtype=self.dtype, device=input_device),
                    split_size_or_sections=num_images,
                    dim=0
                )
                visual_input_ids = torch.split(
                    torch.argmax(visual_tokens, dim=-1).to(device=input_device),
                    split_size_or_sections=num_images,
                    dim=0
                )
                visual_labels = [
                    torch.full(
                        x.shape, IGNORE_INDEX, dtype=torch.long, device=input_device
                    ) for x in visual_input_ids
                ]
            else:
                # just placeholders
                visual_embeds = [None] * len(num_images)
                visual_input_ids = [None] * len(num_images)
                visual_labels = [None] * len(num_images)
            # just placeholders
            text_labels = torch.full(text_input_ids.shape, IGNORE_INDEX, dtype=torch.long, device=input_device)

        input_embeds = []
        attention_masks = []
        labels = []
        for text_input_id, text_label, text_attention_mask, visual_embed, visual_input_id, visual_label in zip(
            text_input_ids, text_labels, text_attention_masks, visual_embeds, visual_input_ids, visual_labels
        ):
            image_token_mask = torch.eq(text_input_id, IMAGE_TOKEN_INDEX)
            text_embed = self.get_wte()(torch.masked_fill(text_input_id, image_token_mask, 0))
            image_token_positions = torch.where(image_token_mask)[0].tolist()
            if len(image_token_positions) > 0:
                input_embed_parts = []
                attention_mask_parts = []
                label_parts = []
                prev_image_token_position = -1
                for index, image_token_position in enumerate(image_token_positions):
                    input_embed_parts.append(
                        text_embed[prev_image_token_position + 1:image_token_position, :])
                    label_parts.append(
                        text_label[prev_image_token_position + 1:image_token_position])
                    attention_mask_parts.append(
                        text_attention_mask[prev_image_token_position + 1:image_token_position])
                    input_embed_parts.append(visual_embed[index])
                    attention_mask_parts.append(
                        torch.ones_like(visual_label[index], dtype=torch.bool))
                    label_parts.append(visual_label[index])
                    prev_image_token_position = image_token_position
                if prev_image_token_position + 1 < text_input_id.shape[0]:
                    input_embed_parts.append(
                        text_embed[prev_image_token_position + 1:, :])
                    attention_mask_parts.append(
                        text_attention_mask[prev_image_token_position + 1:])
                    label_parts.append(
                        text_label[prev_image_token_position + 1:])
                input_embed = torch.cat(input_embed_parts, dim=0)
                attention_mask = torch.cat(attention_mask_parts, dim=0)
                label = torch.cat(label_parts, dim=0)
            else:
                input_embed = text_embed
                attention_mask = text_attention_mask
                label = text_label
                if self.training:
                    # Make visual_embed involved in the backward graph,
                    # to be compatible with deepspeed zero and ddp.
                    input_embed += torch.sum(visual_embed * 0.0)
            input_embeds.append(input_embed)
            attention_masks.append(attention_mask)
            labels.append(label)

        batch_input_embeds = torch.nn.utils.rnn.pad_sequence(
            input_embeds, batch_first=True, padding_value=0.0)[:, :self.config.multimodal_max_length, :]
        batch_attention_mask = torch.nn.utils.rnn.pad_sequence(
            attention_masks, batch_first=True, padding_value=False)[:, :self.config.multimodal_max_length]
        batch_labels = torch.nn.utils.rnn.pad_sequence(
            labels, batch_first=True, padding_value=IGNORE_INDEX)[:, :self.config.multimodal_max_length]

        return visual_input_ids, batch_input_embeds, batch_labels, batch_attention_mask

    def save_pretrained(
        self,
        save_directory: Union[str, os.PathLike],
        is_main_process: bool = True,
        state_dict: Optional[dict] = None,
        save_function: Callable = torch.save,
        push_to_hub: bool = False,
        max_shard_size: Union[int, str] = "5GB",
        safe_serialization: bool = True,
        variant: Optional[str] = None,
        token: Optional[Union[str, bool]] = None,
        save_peft_format: bool = True,
        **kwargs
    ):
        super().save_pretrained(save_directory,
                                is_main_process=is_main_process,
                                state_dict=state_dict,
                                save_function=save_function,
                                safe_serialization=safe_serialization)
        self.get_text_tokenizer().save_pretrained(save_directory)
        self.get_visual_tokenizer().get_image_processor().save_pretrained(save_directory)

        # uncomment the following will additionally save a separate visual tokenizer
        # visual_tokenizer_directory = os.path.join(save_directory, 'visual_tokenizer')
        # self.get_visual_tokenizer().save_pretrained(visual_tokenizer_directory,
        #                                             is_main_process=is_main_process,
        #                                             state_dict=None,
        #                                             save_function=save_function,
        #                                             safe_serialization=safe_serialization)
        # self.get_visual_tokenizer().get_image_processor().save_pretrained(visual_tokenizer_directory)

    # TODO: support batch generation
    def generate(
        self,
        inputs: Optional[torch.Tensor] = None,
        **kwargs
    ) -> Union[GenerateOutput, torch.LongTensor]:
        assert inputs.shape[0] == 1, 'Currently, only support `batch_size=1`'
        _, inputs_embeds, labels, attention_mask = self.merge_multimodal(
            text_input_ids=inputs,
            text_attention_masks=kwargs.pop('attention_mask'),
            text_labels=None,
            pixel_values=kwargs.pop('pixel_values')
        )
        if getattr(self.generation_config, 'cache_implementation') == 'hybrid':  # mainly for Gemma2
            kwargs['past_key_values'] = self.get_llm()._get_cache(
                'hybrid', getattr(kwargs, "num_beams", 1), kwargs['max_new_tokens'] + inputs_embeds.shape[-2]
            )
            self.get_llm()._supports_cache_class = True
            kwargs['cache_implementation'] = None

        return self.llm.generate(inputs=None, inputs_embeds=inputs_embeds, attention_mask=attention_mask, **kwargs)