test1 / app /translator /__init__.py
supArs's picture
Upload 19 files
3afff35 verified
import math
import pickle
from collections import deque
from os.path import exists
from subprocess import call, DEVNULL
import cv2 as cv
import numpy as np
import scipy
import torch
from torch.nn import DataParallel, Module
from app.config import Config
from app.models import InceptionI3d
class Translator:
def __init__(self, confidence: float):
self.confidence = confidence
self.model = self.load_model(Config.checkpoint_path, Config.number_of_classes, Config.number_of_frames)
self.word_data = self.load_vocabulary(Config.vocabulary_path)
self.result = ""
def resize_generic(self, img, oheight, owidth, interp="bilinear", is_flow=False):
"""
Args
inp: numpy array: RGB image (H, W, 3) | video with 3*nframes (H, W, 3*nframes)
| single channel image (H, W, 1) | -- not supported: video with (nframes, 3, H, W)
"""
# resized_image = cv.resize(image, (100, 50))
ht = img.shape[0]
chn = img.shape[2]
if chn == 1:
resized_img = scipy.misc.imresize(
img.squeeze(), [oheight, owidth], interp=interp, mode="F"
).reshape((oheight, owidth, chn))
elif chn == 3:
# resized_img = scipy.misc.imresize(img, [oheight, owidth], interp=interp) # mode='F' gives an error for 3 channels
resized_img = cv.resize(img, (owidth, oheight)) # inverted compared to scipy
elif chn == 2:
# assert(is_flow)
resized_img = np.zeros((oheight, owidth, chn), dtype=img.dtype)
for t in range(chn):
# resized_img[:, :, t] = scipy.misc.imresize(img[:, :, t], [oheight, owidth], interp=interp)
# resized_img[:, :, t] = scipy.misc.imresize(img[:, :, t], [oheight, owidth], interp=interp, mode='F')
# resized_img[:, :, t] = np.array(Image.fromarray(img[:, :, t]).resize([oheight, owidth]))
resized_img[:, :, t] = scipy.ndimage.interpolation.zoom(
img[:, :, t], [oheight, owidth]
)
else:
in_chn = 3
# Workaround, would be better to pass #frames
if chn == 16:
in_chn = 1
if chn == 32:
in_chn = 2
nframes = int(chn / in_chn)
img = img.reshape(img.shape[0], img.shape[1], in_chn, nframes)
resized_img = np.zeros((oheight, owidth, in_chn, nframes), dtype=img.dtype)
for t in range(nframes):
frame = img[:, :, :, t] # img[:, :, t*3:t*3+3]
frame = cv.resize(frame, (owidth, oheight)).reshape(
oheight, owidth, in_chn
)
# frame = scipy.misc.imresize(frame, [oheight, owidth], interp=interp)
resized_img[:, :, :, t] = frame
resized_img = resized_img.reshape(
resized_img.shape[0], resized_img.shape[1], chn
)
if is_flow:
# print(oheight / ht)
# print(owidth / wd)
resized_img = resized_img * oheight / ht
return resized_img
def color_normalize(self, x, mean, std):
"""Normalize a tensor of images by subtracting (resp. dividing) by the mean (resp.
std. deviation) statistics of a dataset in RGB space.
"""
if x.dim() in {3, 4}:
if x.size(0) == 1:
x = x.repeat(3, 1, 1)
assert x.size(0) == 3, "For single video format, expected RGB along first dim"
for t, m, s in zip(x, mean, std):
t.sub_(m)
t.div_(s)
elif x.dim() == 5:
assert (
x.shape[1] == 3
), "For batched video format, expected RGB along second dim"
x[:, 0].sub_(mean[0]).div_(std[0])
x[:, 1].sub_(mean[1]).div_(std[1])
x[:, 2].sub_(mean[2]).div_(std[2])
return x
def to_torch(self, ndarray):
if type(ndarray).__module__ == "numpy":
return torch.from_numpy(ndarray)
elif not torch.is_tensor(ndarray):
raise ValueError(f"Cannot convert {type(ndarray)} to torch tensor")
return ndarray
def to_numpy(self, tensor):
if torch.is_tensor(tensor):
return tensor.cpu().numpy()
elif type(tensor).__module__ != "numpy":
raise ValueError(f"Cannot convert {type(tensor)} to numpy array")
return tensor
def im_to_numpy(self, img):
img = self.to_numpy(img)
img = np.transpose(img, (1, 2, 0)) # H*W*C
return img
def im_to_torch(self, img):
img = np.transpose(img, (2, 0, 1)) # C*H*W
img = self.to_torch(img).float()
return img / 255 if img.max() > 1 else img
def load_model(self, checkpoint_path: str, number_of_classes: int, number_of_frames: int) -> Module:
model = DataParallel(InceptionI3d(
number_of_classes,
spatiotemporal_squeeze=True,
final_endpoint='Logits',
name="inception_i3d",
in_channels=3,
dropout_keep_prob=0.5,
num_in_frames=number_of_frames
)).cuda()
if not exists(Config.checkpoint_path):
call(f'cat app/checkpoints/* >> {Config.checkpoint_path}', shell=True, stdout=DEVNULL)
checkpoint = torch.load(checkpoint_path)
model.load_state_dict(checkpoint['state_dict'])
model.eval()
return model
def load_vocabulary(self, vocabulary_path: str) -> dict:
with open(vocabulary_path, 'rb') as file:
return pickle.load(file)
def prepare_input(self, video: deque, input_resolution: int=224, resize_resolution: int=256, mean: torch.Tensor=0.5*torch.ones(3), std: torch.Tensor=1.0*torch.ones(3)) -> np.ndarray:
video_tensor = torch.stack(
[self.im_to_torch(frame[:, :, [2, 1, 0]]) for frame in video]
).permute(1, 0, 2, 3)
iC, iF, _, _ = video_tensor.shape
video_tensor_resized = np.zeros((iF, resize_resolution, resize_resolution, iC))
for t in range(iF):
tmp = video_tensor[:, t, :, :]
tmp = self.resize_generic(
self.im_to_numpy(tmp), resize_resolution, resize_resolution, interp="bilinear", is_flow=False
)
video_tensor_resized[t] = tmp
video_tensor_resized = np.transpose(video_tensor_resized, (3, 0, 1, 2))
# Center crop coords
ulx = int((resize_resolution - input_resolution) / 2)
uly = int((resize_resolution - input_resolution) / 2)
# Crop 256x256
video_tensor_resized = video_tensor_resized[:, :, uly : uly + input_resolution, ulx : ulx + input_resolution]
video_tensor_resized = self.to_torch(video_tensor_resized).float()
assert video_tensor_resized.max() <= 1
video_tensor_resized = self.color_normalize(video_tensor_resized, mean, std)
return video_tensor_resized
def sliding_windows(self, input_video: torch.Tensor, number_of_frames: int, stride: int) -> torch.Tensor:
"""
Return sliding windows and corresponding (middle) timestamp
"""
C, nFrames, H, W = input_video.shape
# If needed, pad to the minimum clip length
if nFrames < number_of_frames:
rgb_ = torch.zeros(C, number_of_frames, H, W)
rgb_[:, :nFrames] = input_video
rgb_[:, nFrames:] = input_video[:, -1].unsqueeze(1)
input_video = rgb_
nFrames = input_video.shape[1]
num_clips = math.ceil((nFrames - number_of_frames) / stride) + 1
rgb_slided = torch.zeros(num_clips, 3, number_of_frames, H, W)
# For each clip
for j in range(num_clips):
# Check if num_clips becomes 0
stride_j = j * stride
actual_clip_length = min(number_of_frames, nFrames - stride_j)
t_beg = stride_j if actual_clip_length == number_of_frames else nFrames - number_of_frames
rgb_slided[j] = input_video[:, t_beg : t_beg + number_of_frames, :, :]
return rgb_slided
def video_to_asl(self, video: deque):
input_video = self.prepare_input(video)
input_sliding_window = self.sliding_windows(input_video, Config.number_of_frames, Config.stride)
num_clips = input_sliding_window.shape[0]
# Group the clips into batches
num_batches = math.ceil(num_clips / Config.batch_size)
raw_scores = np.empty((0, Config.number_of_classes), dtype=float)
for b in range(num_batches):
inp = input_sliding_window[b * Config.batch_size : (b + 1) * Config.batch_size]
# Forward pass
out = self.model(inp)
raw_scores = np.append(raw_scores, out["logits"].cpu().detach().numpy(), axis=0)
prob_scores = scipy.special.softmax(raw_scores, axis=1)
prob_sorted = np.sort(prob_scores, axis=1)[:, ::-1]
pred_sorted = np.argsort(prob_scores, axis=1)[:, ::-1]
word_topk = np.empty((Config.topk, num_clips), dtype=object)
for k in range(Config.topk):
for i, p in enumerate(pred_sorted[:, k]):
word_topk[k, i] = self.word_data["words"][p]
prob_topk = prob_sorted[:, :Config.topk].transpose()
# print(np.array([word_topk, prob_topk]).transpose())
self.result = "" if prob_topk[0, 0] <= self.confidence else word_topk[0, 0]