Spaces:
Running
on
Zero
Running
on
Zero
import numpy as np | |
import cv2 | |
import os | |
import io | |
import torch | |
from torch import nn | |
import sys | |
from models.backbones.internvideo2 import pretrain_internvideo2_1b_patch14_224 | |
from models.backbones.bert.builder import build_bert | |
# from models.criterions import get_sim | |
from models.backbones.internvideo2.pos_embed import interpolate_pos_embed_internvideo2_new | |
from models.backbones.bert.tokenization_bert import BertTokenizer | |
def _frame_from_video(video): | |
while video.isOpened(): | |
success, frame = video.read() | |
if success: | |
yield frame | |
else: | |
break | |
v_mean = np.array([0.485, 0.456, 0.406]).reshape(1,1,3) | |
v_std = np.array([0.229, 0.224, 0.225]).reshape(1,1,3) | |
def normalize(data): | |
return (data/255.0-v_mean)/v_std | |
def frames2tensor(vid_list, fnum=8, target_size=(224, 224), device=torch.device('cuda')): | |
assert(len(vid_list) >= fnum) | |
step = len(vid_list) // fnum | |
vid_list = vid_list[::step][:fnum] | |
vid_list = [cv2.resize(x[:,:,::-1], target_size) for x in vid_list] | |
vid_tube = [np.expand_dims(normalize(x), axis=(0, 1)) for x in vid_list] | |
vid_tube = np.concatenate(vid_tube, axis=1) | |
vid_tube = np.transpose(vid_tube, (0, 1, 4, 2, 3)) | |
vid_tube = torch.from_numpy(vid_tube).to(device, non_blocking=True).float() | |
return vid_tube | |
def get_text_feat_dict(texts, clip, text_feat_d={}): | |
for t in texts: | |
feat = clip.get_txt_feat(t) | |
text_feat_d[t] = feat | |
return text_feat_d | |
def get_vid_feat(frames, vlm): | |
return vlm.get_vid_features(frames) | |
def retrieve_text(frames, | |
texts, | |
model, | |
topk:int=5, | |
config: dict={}, | |
device=torch.device('cuda')): | |
vlm = model | |
vlm = vlm.to(device) | |
fn = config.get('num_frames', 8) | |
size_t = config.get('size_t', 224) | |
frames_tensor = frames2tensor(frames, fnum=fn, target_size=(size_t, size_t), device=device) | |
vid_feat = vlm.get_vid_features(frames_tensor) | |
print('Video', vid_feat.mean(dim=-1)) | |
text_feat_d = {} | |
text_feat_d = get_text_feat_dict(texts, vlm, text_feat_d) | |
text_feats = [text_feat_d[t] for t in texts] | |
text_feats_tensor = torch.cat(text_feats, 0) | |
print('Text', text_feats_tensor.mean(dim=-1)) | |
probs, idxs = vlm.predict_label(vid_feat, text_feats_tensor, top=topk) | |
ret_texts = [texts[i] for i in idxs.long().numpy()[0].tolist()] | |
return ret_texts, probs.float().numpy()[0] | |
def setup_internvideo2(config: dict): | |
if "bert" in config.model.text_encoder.name: | |
tokenizer = BertTokenizer.from_pretrained(config.model.text_encoder.pretrained, local_files_only=True) | |
model = InternVideo2_Stage2(config=config, tokenizer=tokenizer, is_pretrain=True) | |
else: | |
model = InternVideo2_Stage2(config=config, is_pretrain=True) | |
tokenizer = model.tokenizer | |
if config.get('compile_model', False): | |
torch.set_float32_matmul_precision('high') | |
model = torch.compile(model) | |
model = model.to(torch.device(config.device)) | |
model_without_ddp = model | |
if (config.pretrained_path.strip() and (os.path.isfile(config.pretrained_path)) or "s3://" in config.pretrained_path): | |
checkpoint = torch.load(config.pretrained_path, map_location="cpu") | |
try: | |
if "model" in checkpoint.keys(): | |
state_dict = checkpoint["model"] | |
else: | |
state_dict = checkpoint["module"] # This is a deepspeed stage 1 model | |
except: | |
state_dict = checkpoint | |
# Note: this was a temporary fix due to the bug caused by is_pretrain=False | |
# from collections import OrderedDict | |
# state_dict = OrderedDict({ k.replace('text_encoder.bert', 'text_encoder') : state_dict[k] for k in state_dict}) | |
if config.get('origin_num_frames', None) is not None: | |
a = len(state_dict) | |
interpolate_pos_embed_internvideo2_new(state_dict, model_without_ddp.vision_encoder, orig_t_size=config.origin_num_frames) | |
assert a == len(state_dict), state_dict.keys() | |
msg = model_without_ddp.load_state_dict(state_dict, strict=False) | |
print(f"load_state_dict: {msg}") | |
if config.get('use_bf16', False): | |
model_without_ddp = model_without_ddp.to(torch.bfloat16) | |
elif config.get('use_half_precision', False): | |
model_without_ddp = model_without_ddp.to(torch.float16) | |
else: | |
model_without_ddp = model_without_ddp.to(torch.float32) | |
return (model_without_ddp, tokenizer,) | |
class InternVideo2_Stage2(nn.Module): | |
"""docstring for InternVideo2_Stage2""" | |
def __init__(self, | |
config, | |
tokenizer, | |
is_pretrain: bool=True): | |
super(InternVideo2_Stage2, self).__init__() | |
self.config = config | |
self.tokenizer = tokenizer | |
self.is_pretrain = is_pretrain | |
self.vision_width = config.model.vision_encoder.clip_embed_dim | |
self.text_width = config.model.text_encoder.d_model | |
self.embed_dim = config.model.embed_dim | |
# create modules. | |
self.vision_encoder = self.build_vision_encoder() | |
self.freeze_vision() | |
self.text_encoder = self.build_text_encoder() | |
self.freeze_text() | |
self.vision_proj = nn.Linear(self.vision_width, self.embed_dim) | |
self.text_proj = nn.Linear(self.text_width, self.embed_dim) | |
def freeze_vision(self): | |
"""freeze vision encoder""" | |
for p in self.vision_encoder.parameters(): | |
p.requires_grad = False | |
def freeze_text(self): | |
"""freeze text encoder""" | |
for p in self.text_encoder.parameters(): | |
p.requires_grad = False | |
def dtype(self): | |
return self.vision_encoder.patch_embed.proj.weight.dtype | |
def encode_vision(self, | |
image: torch.Tensor, | |
test: bool=False): | |
"""encode image / videos as features. | |
Args: | |
image (torch.Tensor): The input images. | |
test (bool): Whether testing. | |
Returns: tuple. | |
- vision_embeds (torch.Tensor): The output features. Shape: [B,N,C]. | |
- pooled_vision_embeds (torch.Tensor): The pooled output features. Shape: [B,1,C]. | |
- student_output (torch.Tensor): The features of alignment. Shape: [K,B,N,C]. | |
- clip_output (torch.Tensor): The features of clip. Shape: [K,B,N,C]. | |
""" | |
T = image.shape[1] | |
use_image = True if T == 1 else False | |
image = image.permute(0, 2, 1, 3, 4).to(self.dtype) # [B,T,C,H,W] -> [B,C,T,H,W] | |
# whether save temporal dimension | |
# keep_temporal=self.config.model.vision_encoder.keep_temporal | |
if test: | |
vision_embeds, pooled_vision_embeds, _, _ = self.vision_encoder( | |
image, None, use_image) | |
return vision_embeds, pooled_vision_embeds | |
else: | |
mask, targets_clip_middle_vis, targets_clip_final_vis = self.encode_teacher(image) | |
# if mask is not None and (self.video_mask_type != 'tube' or self.image_mask_type != 'tube'): | |
# keep_temporal = False | |
# print(f"\033[31mmask is {type(mask)}\033[0m") | |
vision_embeds, pooled_vision_embeds, student_output, student_output_final = self.vision_encoder( | |
image, mask, use_image) | |
return vision_embeds, pooled_vision_embeds, student_output, student_output_final, targets_clip_middle_vis, targets_clip_final_vis | |
def encode_text(self, | |
text: dict): | |
"""encode text. | |
Args: | |
text (dict): The output of huggingface's `PreTrainedTokenizer`. contains keys: | |
- input_ids (torch.Tensor): Token ids to be fed to a model. Shape: [B,L]. | |
- attention_mask (torch.Tensor): The mask indicate padded tokens. Shape: [B,L]. 0 is padded token. | |
- other keys refer to "https://huggingface.co/docs/transformers/v4.21.2/en/main_classes/tokenizer#transformers.PreTrainedTokenizer.__call__". | |
Returns: tuple. | |
- text_embeds (torch.Tensor): The features of all tokens. Shape: [B,L,C]. | |
- pooled_text_embeds (torch.Tensor): The pooled features. Shape: [B,C]. | |
""" | |
text_output = self.get_text_encoder()( | |
text.input_ids, | |
attention_mask=text.attention_mask, | |
return_dict=True, | |
mode="text", | |
) | |
text_embeds = text_output.last_hidden_state | |
pooled_text_embeds = text_embeds[:, 0] | |
return text_embeds, pooled_text_embeds | |
def build_vision_encoder(self): | |
"""build vision encoder | |
Returns: (vision_encoder, clip_teacher). Each is a `nn.Module`. | |
""" | |
encoder_name = self.config.model.vision_encoder.name | |
if encoder_name == 'pretrain_internvideo2_1b_patch14_224': | |
vision_encoder = pretrain_internvideo2_1b_patch14_224(self.config.model) | |
else: | |
raise ValueError(f"Not implemented: {encoder_name}") | |
# parameters for mask | |
img_size = self.config.model.vision_encoder.img_size | |
num_frames = self.config.model.vision_encoder.num_frames | |
tublet_size = self.config.model.vision_encoder.tubelet_size | |
patch_size = self.config.model.vision_encoder.patch_size | |
self.clip_img_size = self.config.model.vision_encoder.clip_input_resolution | |
self.video_mask_type = self.config.model.vision_encoder.video_mask_type | |
self.video_window_size = (num_frames // tublet_size, img_size // patch_size, img_size // patch_size) | |
self.video_mask_ratio = self.config.model.vision_encoder.video_mask_ratio | |
self.image_mask_type = self.config.model.vision_encoder.image_mask_type | |
self.image_window_size = (1, img_size // patch_size, img_size // patch_size) | |
self.image_mask_ratio = self.config.model.vision_encoder.image_mask_ratio | |
return vision_encoder | |
def build_text_encoder(self): | |
"""build text_encoder and possiblly video-to-text multimodal fusion encoder. | |
Returns: nn.Module. The text encoder | |
""" | |
encoder_name = self.config.model.text_encoder.name | |
if "bert" in encoder_name: | |
text_encoder = build_bert( | |
self.config.model, | |
self.is_pretrain, | |
self.config.gradient_checkpointing, | |
) | |
else: | |
raise ValueError(f"Not implemented: {encoder_name}") | |
return text_encoder | |
def get_text_encoder(self): | |
"""get text encoder, used for text and cross-modal encoding""" | |
encoder = self.text_encoder | |
return encoder.bert if hasattr(encoder, "bert") else encoder | |
def get_vid_features(self, | |
frames: torch.Tensor): | |
"""get the video features for the given frames. | |
Args: | |
frames (torch.Tensor): The input frames. Shape: [B,T,C,H,W]. | |
Returns: tuple. | |
- vision_embeds (torch.Tensor): The output features. Shape: [B,N,C]. | |
- pooled_vision_embeds (torch.Tensor): The pooled output features. Shape: [B,1,C]. | |
""" | |
with torch.no_grad(): | |
_, vfeat = self.encode_vision(frames, test=True) | |
vfeat = self.vision_proj(vfeat) | |
vfeat /= vfeat.norm(dim=-1, keepdim=True) | |
return vfeat | |
def get_txt_feat(self, | |
text: str): | |
"""get the text features for the given text.""" | |
device = next(self.parameters()).device | |
with torch.no_grad(): | |
text = self.tokenizer( | |
text, | |
padding="max_length", | |
truncation=True, | |
max_length=self.config.max_txt_l, | |
return_tensors="pt",).to(device) | |
_, tfeat = self.encode_text(text) | |
tfeat = self.text_proj(tfeat) | |
tfeat /= tfeat.norm(dim=-1, keepdim=True) | |
return tfeat | |
def predict_label(self, | |
vid_feat: torch.Tensor, | |
txt_feat: torch.Tensor, | |
top: int=5): | |
label_probs = (100.0 * vid_feat @ txt_feat.T).softmax(dim=-1) | |
top_probs, top_labels = label_probs.float().cpu().topk(top, dim=-1) | |
return top_probs, top_labels | |