|
import math |
|
import pickle |
|
from collections import deque |
|
from os.path import exists |
|
from subprocess import call, DEVNULL |
|
|
|
import cv2 as cv |
|
import numpy as np |
|
import scipy |
|
import torch |
|
from torch.nn import DataParallel, Module |
|
|
|
from app.config import Config |
|
from app.models import InceptionI3d |
|
|
|
|
|
class Translator: |
|
|
|
def __init__(self, confidence: float): |
|
|
|
self.confidence = confidence |
|
self.model = self.load_model(Config.checkpoint_path, Config.number_of_classes, Config.number_of_frames) |
|
self.word_data = self.load_vocabulary(Config.vocabulary_path) |
|
self.result = "" |
|
|
|
|
|
def resize_generic(self, img, oheight, owidth, interp="bilinear", is_flow=False): |
|
""" |
|
Args |
|
inp: numpy array: RGB image (H, W, 3) | video with 3*nframes (H, W, 3*nframes) |
|
| single channel image (H, W, 1) | -- not supported: video with (nframes, 3, H, W) |
|
""" |
|
|
|
|
|
ht = img.shape[0] |
|
chn = img.shape[2] |
|
|
|
if chn == 1: |
|
resized_img = scipy.misc.imresize( |
|
img.squeeze(), [oheight, owidth], interp=interp, mode="F" |
|
).reshape((oheight, owidth, chn)) |
|
elif chn == 3: |
|
|
|
resized_img = cv.resize(img, (owidth, oheight)) |
|
elif chn == 2: |
|
|
|
resized_img = np.zeros((oheight, owidth, chn), dtype=img.dtype) |
|
for t in range(chn): |
|
|
|
|
|
|
|
resized_img[:, :, t] = scipy.ndimage.interpolation.zoom( |
|
img[:, :, t], [oheight, owidth] |
|
) |
|
else: |
|
in_chn = 3 |
|
|
|
if chn == 16: |
|
in_chn = 1 |
|
if chn == 32: |
|
in_chn = 2 |
|
nframes = int(chn / in_chn) |
|
img = img.reshape(img.shape[0], img.shape[1], in_chn, nframes) |
|
resized_img = np.zeros((oheight, owidth, in_chn, nframes), dtype=img.dtype) |
|
for t in range(nframes): |
|
frame = img[:, :, :, t] |
|
frame = cv.resize(frame, (owidth, oheight)).reshape( |
|
oheight, owidth, in_chn |
|
) |
|
|
|
resized_img[:, :, :, t] = frame |
|
resized_img = resized_img.reshape( |
|
resized_img.shape[0], resized_img.shape[1], chn |
|
) |
|
|
|
if is_flow: |
|
|
|
|
|
resized_img = resized_img * oheight / ht |
|
return resized_img |
|
|
|
|
|
def color_normalize(self, x, mean, std): |
|
"""Normalize a tensor of images by subtracting (resp. dividing) by the mean (resp. |
|
std. deviation) statistics of a dataset in RGB space. |
|
""" |
|
if x.dim() in {3, 4}: |
|
if x.size(0) == 1: |
|
x = x.repeat(3, 1, 1) |
|
assert x.size(0) == 3, "For single video format, expected RGB along first dim" |
|
for t, m, s in zip(x, mean, std): |
|
t.sub_(m) |
|
t.div_(s) |
|
elif x.dim() == 5: |
|
assert ( |
|
x.shape[1] == 3 |
|
), "For batched video format, expected RGB along second dim" |
|
x[:, 0].sub_(mean[0]).div_(std[0]) |
|
x[:, 1].sub_(mean[1]).div_(std[1]) |
|
x[:, 2].sub_(mean[2]).div_(std[2]) |
|
return x |
|
|
|
|
|
def to_torch(self, ndarray): |
|
|
|
if type(ndarray).__module__ == "numpy": |
|
return torch.from_numpy(ndarray) |
|
elif not torch.is_tensor(ndarray): |
|
raise ValueError(f"Cannot convert {type(ndarray)} to torch tensor") |
|
return ndarray |
|
|
|
|
|
def to_numpy(self, tensor): |
|
|
|
if torch.is_tensor(tensor): |
|
return tensor.cpu().numpy() |
|
elif type(tensor).__module__ != "numpy": |
|
raise ValueError(f"Cannot convert {type(tensor)} to numpy array") |
|
return tensor |
|
|
|
|
|
def im_to_numpy(self, img): |
|
|
|
img = self.to_numpy(img) |
|
img = np.transpose(img, (1, 2, 0)) |
|
|
|
return img |
|
|
|
|
|
def im_to_torch(self, img): |
|
|
|
img = np.transpose(img, (2, 0, 1)) |
|
img = self.to_torch(img).float() |
|
|
|
return img / 255 if img.max() > 1 else img |
|
|
|
|
|
def load_model(self, checkpoint_path: str, number_of_classes: int, number_of_frames: int) -> Module: |
|
|
|
model = DataParallel(InceptionI3d( |
|
number_of_classes, |
|
spatiotemporal_squeeze=True, |
|
final_endpoint='Logits', |
|
name="inception_i3d", |
|
in_channels=3, |
|
dropout_keep_prob=0.5, |
|
num_in_frames=number_of_frames |
|
)).cuda() |
|
|
|
if not exists(Config.checkpoint_path): |
|
call(f'cat app/checkpoints/* >> {Config.checkpoint_path}', shell=True, stdout=DEVNULL) |
|
|
|
checkpoint = torch.load(checkpoint_path) |
|
model.load_state_dict(checkpoint['state_dict']) |
|
model.eval() |
|
|
|
return model |
|
|
|
|
|
def load_vocabulary(self, vocabulary_path: str) -> dict: |
|
|
|
with open(vocabulary_path, 'rb') as file: |
|
return pickle.load(file) |
|
|
|
|
|
def prepare_input(self, video: deque, input_resolution: int=224, resize_resolution: int=256, mean: torch.Tensor=0.5*torch.ones(3), std: torch.Tensor=1.0*torch.ones(3)) -> np.ndarray: |
|
|
|
video_tensor = torch.stack( |
|
[self.im_to_torch(frame[:, :, [2, 1, 0]]) for frame in video] |
|
).permute(1, 0, 2, 3) |
|
|
|
iC, iF, _, _ = video_tensor.shape |
|
video_tensor_resized = np.zeros((iF, resize_resolution, resize_resolution, iC)) |
|
for t in range(iF): |
|
tmp = video_tensor[:, t, :, :] |
|
tmp = self.resize_generic( |
|
self.im_to_numpy(tmp), resize_resolution, resize_resolution, interp="bilinear", is_flow=False |
|
) |
|
video_tensor_resized[t] = tmp |
|
video_tensor_resized = np.transpose(video_tensor_resized, (3, 0, 1, 2)) |
|
|
|
ulx = int((resize_resolution - input_resolution) / 2) |
|
uly = int((resize_resolution - input_resolution) / 2) |
|
|
|
video_tensor_resized = video_tensor_resized[:, :, uly : uly + input_resolution, ulx : ulx + input_resolution] |
|
video_tensor_resized = self.to_torch(video_tensor_resized).float() |
|
assert video_tensor_resized.max() <= 1 |
|
video_tensor_resized = self.color_normalize(video_tensor_resized, mean, std) |
|
return video_tensor_resized |
|
|
|
|
|
def sliding_windows(self, input_video: torch.Tensor, number_of_frames: int, stride: int) -> torch.Tensor: |
|
|
|
""" |
|
Return sliding windows and corresponding (middle) timestamp |
|
""" |
|
C, nFrames, H, W = input_video.shape |
|
|
|
if nFrames < number_of_frames: |
|
rgb_ = torch.zeros(C, number_of_frames, H, W) |
|
rgb_[:, :nFrames] = input_video |
|
rgb_[:, nFrames:] = input_video[:, -1].unsqueeze(1) |
|
input_video = rgb_ |
|
nFrames = input_video.shape[1] |
|
|
|
num_clips = math.ceil((nFrames - number_of_frames) / stride) + 1 |
|
|
|
rgb_slided = torch.zeros(num_clips, 3, number_of_frames, H, W) |
|
|
|
for j in range(num_clips): |
|
|
|
stride_j = j * stride |
|
actual_clip_length = min(number_of_frames, nFrames - stride_j) |
|
t_beg = stride_j if actual_clip_length == number_of_frames else nFrames - number_of_frames |
|
rgb_slided[j] = input_video[:, t_beg : t_beg + number_of_frames, :, :] |
|
|
|
return rgb_slided |
|
|
|
|
|
def video_to_asl(self, video: deque): |
|
|
|
input_video = self.prepare_input(video) |
|
input_sliding_window = self.sliding_windows(input_video, Config.number_of_frames, Config.stride) |
|
|
|
num_clips = input_sliding_window.shape[0] |
|
|
|
num_batches = math.ceil(num_clips / Config.batch_size) |
|
raw_scores = np.empty((0, Config.number_of_classes), dtype=float) |
|
for b in range(num_batches): |
|
inp = input_sliding_window[b * Config.batch_size : (b + 1) * Config.batch_size] |
|
|
|
out = self.model(inp) |
|
raw_scores = np.append(raw_scores, out["logits"].cpu().detach().numpy(), axis=0) |
|
prob_scores = scipy.special.softmax(raw_scores, axis=1) |
|
prob_sorted = np.sort(prob_scores, axis=1)[:, ::-1] |
|
pred_sorted = np.argsort(prob_scores, axis=1)[:, ::-1] |
|
|
|
word_topk = np.empty((Config.topk, num_clips), dtype=object) |
|
for k in range(Config.topk): |
|
for i, p in enumerate(pred_sorted[:, k]): |
|
word_topk[k, i] = self.word_data["words"][p] |
|
prob_topk = prob_sorted[:, :Config.topk].transpose() |
|
|
|
|
|
self.result = "" if prob_topk[0, 0] <= self.confidence else word_topk[0, 0] |
|
|