mazpie's picture
Initial commit
2d9a728
raw
history blame
9.41 kB
import os
import logging
import torch
from einops import rearrange
from torch import nn
import math
# from .criterions import VTC_VTM_Loss
from .simple_tokenizer import SimpleTokenizer as _Tokenizer
from .viclip_vision import clip_joint_l14, clip_joint_b16
from .viclip_text import clip_text_l14, clip_text_b16
logger = logging.getLogger(__name__)
class ViCLIP(nn.Module):
"""docstring for ViCLIP"""
def __init__(self,
tokenizer=None,
size='l',
pretrain=os.path.join(os.path.dirname(os.path.abspath(__file__)), "ViClip-InternVid-10M-FLT.pth"),
freeze_text=True):
super(ViCLIP, self).__init__()
if tokenizer:
self.tokenizer = tokenizer
else:
self.tokenizer = _Tokenizer()
self.max_txt_l = 32
if size.lower() == 'l':
self.vision_encoder_name = 'vit_l14'
elif size.lower() == 'b':
self.vision_encoder_name = 'vit_b16'
else:
raise NotImplementedError(f"Size {size} not implemented")
self.vision_encoder_pretrained = False
self.inputs_image_res = 224
self.vision_encoder_kernel_size = 1
self.vision_encoder_center = True
self.video_input_num_frames = 8
self.vision_encoder_drop_path_rate = 0.1
self.vision_encoder_checkpoint_num = 24
self.is_pretrain = pretrain
self.vision_width = 1024
self.text_width = 768
self.embed_dim = 768
self.masking_prob = 0.9
if size.lower() == 'l':
self.text_encoder_name = 'vit_l14'
elif size.lower() == 'b':
self.text_encoder_name = 'vit_b16'
else:
raise NotImplementedError(f"Size {size} not implemented")
self.text_encoder_pretrained = False#'bert-base-uncased'
self.text_encoder_d_model = 768
self.text_encoder_vocab_size = 49408
# create modules.
self.vision_encoder = self.build_vision_encoder()
self.text_encoder = self.build_text_encoder()
self.temp = nn.parameter.Parameter(torch.ones([]) * 1 / 100.0)
self.temp_min = 1 / 100.0
if pretrain:
logger.info(f"Load pretrained weights from {pretrain}")
state_dict = torch.load(pretrain, map_location='cpu')['model']
self.load_state_dict(state_dict)
# Freeze weights
if freeze_text:
self.freeze_text()
def freeze_text(self):
"""freeze text encoder"""
for p in self.text_encoder.parameters():
p.requires_grad = False
def no_weight_decay(self):
ret = {"temp"}
ret.update(
{"vision_encoder." + k for k in self.vision_encoder.no_weight_decay()}
)
ret.update(
{"text_encoder." + k for k in self.text_encoder.no_weight_decay()}
)
return ret
def forward(self, image, text, raw_text, idx, log_generation=None, return_sims=False):
"""forward and calculate loss.
Args:
image (torch.Tensor): The input images. Shape: [B,T,C,H,W].
text (dict): TODO
idx (torch.Tensor): TODO
Returns: TODO
"""
self.clip_contrastive_temperature()
vision_embeds = self.encode_vision(image)
text_embeds = self.encode_text(raw_text)
if return_sims:
sims = torch.nn.functional.normalize(vision_embeds, dim=-1) @ \
torch.nn.functional.normalize(text_embeds, dim=-1).transpose(0, 1)
return sims
# calculate loss
## VTC loss
loss_vtc = self.clip_loss.vtc_loss(
vision_embeds, text_embeds, idx, self.temp, all_gather=True
)
return dict(
loss_vtc=loss_vtc,
)
def encode_vision(self, image, test=False):
"""encode image / videos as features.
Args:
image (torch.Tensor): The input images.
test (bool): Whether testing.
Returns: tuple.
- vision_embeds (torch.Tensor): The features of all patches. Shape: [B,T,L,C].
- pooled_vision_embeds (torch.Tensor): The pooled features. Shape: [B,T,C].
"""
if image.ndim == 5:
image = image.permute(0, 2, 1, 3, 4).contiguous()
else:
image = image.unsqueeze(2)
if not test and self.masking_prob > 0.0:
return self.vision_encoder(
image, masking_prob=self.masking_prob
)
return self.vision_encoder(image)
def encode_text(self, text):
"""encode text.
Args:
text (dict): The output of huggingface's `PreTrainedTokenizer`. contains keys:
- input_ids (torch.Tensor): Token ids to be fed to a model. Shape: [B,L].
- attention_mask (torch.Tensor): The mask indicate padded tokens. Shape: [B,L]. 0 is padded token.
- other keys refer to "https://huggingface.co/docs/transformers/v4.21.2/en/main_classes/tokenizer#transformers.PreTrainedTokenizer.__call__".
Returns: tuple.
- text_embeds (torch.Tensor): The features of all tokens. Shape: [B,L,C].
- pooled_text_embeds (torch.Tensor): The pooled features. Shape: [B,C].
"""
device = next(self.text_encoder.parameters()).device
text = self.text_encoder.tokenize(
text, context_length=self.max_txt_l
).to(device)
text_embeds = self.text_encoder(text)
return text_embeds
@torch.no_grad()
def clip_contrastive_temperature(self, min_val=0.001, max_val=0.5):
"""Seems only used during pre-training"""
self.temp.clamp_(min=self.temp_min)
def build_vision_encoder(self):
"""build vision encoder
Returns: (vision_encoder, vision_layernorm). Each is a `nn.Module`.
"""
encoder_name = self.vision_encoder_name
if encoder_name == "vit_l14":
vision_encoder = clip_joint_l14(
pretrained=self.vision_encoder_pretrained,
input_resolution=self.inputs_image_res,
kernel_size=self.vision_encoder_kernel_size,
center=self.vision_encoder_center,
num_frames=self.video_input_num_frames,
drop_path=self.vision_encoder_drop_path_rate,
checkpoint_num=self.vision_encoder_checkpoint_num,
)
elif encoder_name == "vit_b16":
vision_encoder = clip_joint_b16(
pretrained=self.vision_encoder_pretrained,
input_resolution=self.inputs_image_res,
kernel_size=self.vision_encoder_kernel_size,
center=self.vision_encoder_center,
num_frames=self.video_input_num_frames,
drop_path=self.vision_encoder_drop_path_rate,
checkpoint_num=self.vision_encoder_checkpoint_num,
)
else:
raise NotImplementedError(f"Not implemented: {encoder_name}")
return vision_encoder
def build_text_encoder(self):
"""build text_encoder and possiblly video-to-text multimodal fusion encoder.
Returns: nn.Module. The text encoder
"""
encoder_name = self.text_encoder_name
if encoder_name == "vit_l14":
text_encoder = clip_text_l14(
pretrained=self.text_encoder_pretrained,
context_length=self.max_txt_l,
vocab_size=self.text_encoder_vocab_size,
checkpoint_num=0,
)
elif encoder_name == "vit_b16":
text_encoder = clip_text_b16(
pretrained=self.text_encoder_pretrained,
context_length=self.max_txt_l,
vocab_size=self.text_encoder_vocab_size,
checkpoint_num=0,
)
else:
raise NotImplementedError(f"Not implemented: {encoder_name}")
return text_encoder
def get_text_encoder(self):
"""get text encoder, used for text and cross-modal encoding"""
encoder = self.text_encoder
return encoder.bert if hasattr(encoder, "bert") else encoder
def get_text_features(self, input_text, tokenizer, text_feature_dict={}):
if input_text in text_feature_dict:
return text_feature_dict[input_text]
text_template= f"{input_text}"
with torch.no_grad():
# text_token = tokenizer.encode(text_template).cuda()
text_features = self.encode_text(text_template).float()
text_features /= text_features.norm(dim=-1, keepdim=True)
text_feature_dict[input_text] = text_features
return text_features
def get_vid_features(self, input_frames):
with torch.no_grad():
clip_feat = self.encode_vision(input_frames,test=True).float()
clip_feat /= clip_feat.norm(dim=-1, keepdim=True)
return clip_feat
def get_predict_label(self, clip_feature, text_feats_tensor, top=5):
label_probs = (100.0 * clip_feature @ text_feats_tensor.T).softmax(dim=-1)
top_probs, top_labels = label_probs.cpu().topk(top, dim=-1)
return top_probs, top_labels
if __name__ =="__main__":
tokenizer = _Tokenizer()