Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import logging | |
import torch | |
from einops import rearrange | |
from torch import nn | |
import math | |
# from .criterions import VTC_VTM_Loss | |
from .simple_tokenizer import SimpleTokenizer as _Tokenizer | |
from .viclip_vision import clip_joint_l14, clip_joint_b16 | |
from .viclip_text import clip_text_l14, clip_text_b16 | |
logger = logging.getLogger(__name__) | |
class ViCLIP(nn.Module): | |
"""docstring for ViCLIP""" | |
def __init__(self, | |
tokenizer=None, | |
size='l', | |
pretrain=os.path.join(os.path.dirname(os.path.abspath(__file__)), "ViClip-InternVid-10M-FLT.pth"), | |
freeze_text=True): | |
super(ViCLIP, self).__init__() | |
if tokenizer: | |
self.tokenizer = tokenizer | |
else: | |
self.tokenizer = _Tokenizer() | |
self.max_txt_l = 32 | |
if size.lower() == 'l': | |
self.vision_encoder_name = 'vit_l14' | |
elif size.lower() == 'b': | |
self.vision_encoder_name = 'vit_b16' | |
else: | |
raise NotImplementedError(f"Size {size} not implemented") | |
self.vision_encoder_pretrained = False | |
self.inputs_image_res = 224 | |
self.vision_encoder_kernel_size = 1 | |
self.vision_encoder_center = True | |
self.video_input_num_frames = 8 | |
self.vision_encoder_drop_path_rate = 0.1 | |
self.vision_encoder_checkpoint_num = 24 | |
self.is_pretrain = pretrain | |
self.vision_width = 1024 | |
self.text_width = 768 | |
self.embed_dim = 768 | |
self.masking_prob = 0.9 | |
if size.lower() == 'l': | |
self.text_encoder_name = 'vit_l14' | |
elif size.lower() == 'b': | |
self.text_encoder_name = 'vit_b16' | |
else: | |
raise NotImplementedError(f"Size {size} not implemented") | |
self.text_encoder_pretrained = False#'bert-base-uncased' | |
self.text_encoder_d_model = 768 | |
self.text_encoder_vocab_size = 49408 | |
# create modules. | |
self.vision_encoder = self.build_vision_encoder() | |
self.text_encoder = self.build_text_encoder() | |
self.temp = nn.parameter.Parameter(torch.ones([]) * 1 / 100.0) | |
self.temp_min = 1 / 100.0 | |
if pretrain: | |
logger.info(f"Load pretrained weights from {pretrain}") | |
state_dict = torch.load(pretrain, map_location='cpu')['model'] | |
self.load_state_dict(state_dict) | |
# Freeze weights | |
if freeze_text: | |
self.freeze_text() | |
def freeze_text(self): | |
"""freeze text encoder""" | |
for p in self.text_encoder.parameters(): | |
p.requires_grad = False | |
def no_weight_decay(self): | |
ret = {"temp"} | |
ret.update( | |
{"vision_encoder." + k for k in self.vision_encoder.no_weight_decay()} | |
) | |
ret.update( | |
{"text_encoder." + k for k in self.text_encoder.no_weight_decay()} | |
) | |
return ret | |
def forward(self, image, text, raw_text, idx, log_generation=None, return_sims=False): | |
"""forward and calculate loss. | |
Args: | |
image (torch.Tensor): The input images. Shape: [B,T,C,H,W]. | |
text (dict): TODO | |
idx (torch.Tensor): TODO | |
Returns: TODO | |
""" | |
self.clip_contrastive_temperature() | |
vision_embeds = self.encode_vision(image) | |
text_embeds = self.encode_text(raw_text) | |
if return_sims: | |
sims = torch.nn.functional.normalize(vision_embeds, dim=-1) @ \ | |
torch.nn.functional.normalize(text_embeds, dim=-1).transpose(0, 1) | |
return sims | |
# calculate loss | |
## VTC loss | |
loss_vtc = self.clip_loss.vtc_loss( | |
vision_embeds, text_embeds, idx, self.temp, all_gather=True | |
) | |
return dict( | |
loss_vtc=loss_vtc, | |
) | |
def encode_vision(self, image, test=False): | |
"""encode image / videos as features. | |
Args: | |
image (torch.Tensor): The input images. | |
test (bool): Whether testing. | |
Returns: tuple. | |
- vision_embeds (torch.Tensor): The features of all patches. Shape: [B,T,L,C]. | |
- pooled_vision_embeds (torch.Tensor): The pooled features. Shape: [B,T,C]. | |
""" | |
if image.ndim == 5: | |
image = image.permute(0, 2, 1, 3, 4).contiguous() | |
else: | |
image = image.unsqueeze(2) | |
if not test and self.masking_prob > 0.0: | |
return self.vision_encoder( | |
image, masking_prob=self.masking_prob | |
) | |
return self.vision_encoder(image) | |
def encode_text(self, text): | |
"""encode text. | |
Args: | |
text (dict): The output of huggingface's `PreTrainedTokenizer`. contains keys: | |
- input_ids (torch.Tensor): Token ids to be fed to a model. Shape: [B,L]. | |
- attention_mask (torch.Tensor): The mask indicate padded tokens. Shape: [B,L]. 0 is padded token. | |
- other keys refer to "https://huggingface.co/docs/transformers/v4.21.2/en/main_classes/tokenizer#transformers.PreTrainedTokenizer.__call__". | |
Returns: tuple. | |
- text_embeds (torch.Tensor): The features of all tokens. Shape: [B,L,C]. | |
- pooled_text_embeds (torch.Tensor): The pooled features. Shape: [B,C]. | |
""" | |
device = next(self.text_encoder.parameters()).device | |
text = self.text_encoder.tokenize( | |
text, context_length=self.max_txt_l | |
).to(device) | |
text_embeds = self.text_encoder(text) | |
return text_embeds | |
def clip_contrastive_temperature(self, min_val=0.001, max_val=0.5): | |
"""Seems only used during pre-training""" | |
self.temp.clamp_(min=self.temp_min) | |
def build_vision_encoder(self): | |
"""build vision encoder | |
Returns: (vision_encoder, vision_layernorm). Each is a `nn.Module`. | |
""" | |
encoder_name = self.vision_encoder_name | |
if encoder_name == "vit_l14": | |
vision_encoder = clip_joint_l14( | |
pretrained=self.vision_encoder_pretrained, | |
input_resolution=self.inputs_image_res, | |
kernel_size=self.vision_encoder_kernel_size, | |
center=self.vision_encoder_center, | |
num_frames=self.video_input_num_frames, | |
drop_path=self.vision_encoder_drop_path_rate, | |
checkpoint_num=self.vision_encoder_checkpoint_num, | |
) | |
elif encoder_name == "vit_b16": | |
vision_encoder = clip_joint_b16( | |
pretrained=self.vision_encoder_pretrained, | |
input_resolution=self.inputs_image_res, | |
kernel_size=self.vision_encoder_kernel_size, | |
center=self.vision_encoder_center, | |
num_frames=self.video_input_num_frames, | |
drop_path=self.vision_encoder_drop_path_rate, | |
checkpoint_num=self.vision_encoder_checkpoint_num, | |
) | |
else: | |
raise NotImplementedError(f"Not implemented: {encoder_name}") | |
return vision_encoder | |
def build_text_encoder(self): | |
"""build text_encoder and possiblly video-to-text multimodal fusion encoder. | |
Returns: nn.Module. The text encoder | |
""" | |
encoder_name = self.text_encoder_name | |
if encoder_name == "vit_l14": | |
text_encoder = clip_text_l14( | |
pretrained=self.text_encoder_pretrained, | |
context_length=self.max_txt_l, | |
vocab_size=self.text_encoder_vocab_size, | |
checkpoint_num=0, | |
) | |
elif encoder_name == "vit_b16": | |
text_encoder = clip_text_b16( | |
pretrained=self.text_encoder_pretrained, | |
context_length=self.max_txt_l, | |
vocab_size=self.text_encoder_vocab_size, | |
checkpoint_num=0, | |
) | |
else: | |
raise NotImplementedError(f"Not implemented: {encoder_name}") | |
return text_encoder | |
def get_text_encoder(self): | |
"""get text encoder, used for text and cross-modal encoding""" | |
encoder = self.text_encoder | |
return encoder.bert if hasattr(encoder, "bert") else encoder | |
def get_text_features(self, input_text, tokenizer, text_feature_dict={}): | |
if input_text in text_feature_dict: | |
return text_feature_dict[input_text] | |
text_template= f"{input_text}" | |
with torch.no_grad(): | |
# text_token = tokenizer.encode(text_template).cuda() | |
text_features = self.encode_text(text_template).float() | |
text_features /= text_features.norm(dim=-1, keepdim=True) | |
text_feature_dict[input_text] = text_features | |
return text_features | |
def get_vid_features(self, input_frames): | |
with torch.no_grad(): | |
clip_feat = self.encode_vision(input_frames,test=True).float() | |
clip_feat /= clip_feat.norm(dim=-1, keepdim=True) | |
return clip_feat | |
def get_predict_label(self, clip_feature, text_feats_tensor, top=5): | |
label_probs = (100.0 * clip_feature @ text_feats_tensor.T).softmax(dim=-1) | |
top_probs, top_labels = label_probs.cpu().topk(top, dim=-1) | |
return top_probs, top_labels | |
if __name__ =="__main__": | |
tokenizer = _Tokenizer() | |