File size: 9,540 Bytes
3afff35 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
import math
import pickle
from collections import deque
from os.path import exists
from subprocess import call, DEVNULL
import cv2 as cv
import numpy as np
import scipy
import torch
from torch.nn import DataParallel, Module
from app.config import Config
from app.models import InceptionI3d
class Translator:
def __init__(self, confidence: float):
self.confidence = confidence
self.model = self.load_model(Config.checkpoint_path, Config.number_of_classes, Config.number_of_frames)
self.word_data = self.load_vocabulary(Config.vocabulary_path)
self.result = ""
def resize_generic(self, img, oheight, owidth, interp="bilinear", is_flow=False):
"""
Args
inp: numpy array: RGB image (H, W, 3) | video with 3*nframes (H, W, 3*nframes)
| single channel image (H, W, 1) | -- not supported: video with (nframes, 3, H, W)
"""
# resized_image = cv.resize(image, (100, 50))
ht = img.shape[0]
chn = img.shape[2]
if chn == 1:
resized_img = scipy.misc.imresize(
img.squeeze(), [oheight, owidth], interp=interp, mode="F"
).reshape((oheight, owidth, chn))
elif chn == 3:
# resized_img = scipy.misc.imresize(img, [oheight, owidth], interp=interp) # mode='F' gives an error for 3 channels
resized_img = cv.resize(img, (owidth, oheight)) # inverted compared to scipy
elif chn == 2:
# assert(is_flow)
resized_img = np.zeros((oheight, owidth, chn), dtype=img.dtype)
for t in range(chn):
# resized_img[:, :, t] = scipy.misc.imresize(img[:, :, t], [oheight, owidth], interp=interp)
# resized_img[:, :, t] = scipy.misc.imresize(img[:, :, t], [oheight, owidth], interp=interp, mode='F')
# resized_img[:, :, t] = np.array(Image.fromarray(img[:, :, t]).resize([oheight, owidth]))
resized_img[:, :, t] = scipy.ndimage.interpolation.zoom(
img[:, :, t], [oheight, owidth]
)
else:
in_chn = 3
# Workaround, would be better to pass #frames
if chn == 16:
in_chn = 1
if chn == 32:
in_chn = 2
nframes = int(chn / in_chn)
img = img.reshape(img.shape[0], img.shape[1], in_chn, nframes)
resized_img = np.zeros((oheight, owidth, in_chn, nframes), dtype=img.dtype)
for t in range(nframes):
frame = img[:, :, :, t] # img[:, :, t*3:t*3+3]
frame = cv.resize(frame, (owidth, oheight)).reshape(
oheight, owidth, in_chn
)
# frame = scipy.misc.imresize(frame, [oheight, owidth], interp=interp)
resized_img[:, :, :, t] = frame
resized_img = resized_img.reshape(
resized_img.shape[0], resized_img.shape[1], chn
)
if is_flow:
# print(oheight / ht)
# print(owidth / wd)
resized_img = resized_img * oheight / ht
return resized_img
def color_normalize(self, x, mean, std):
"""Normalize a tensor of images by subtracting (resp. dividing) by the mean (resp.
std. deviation) statistics of a dataset in RGB space.
"""
if x.dim() in {3, 4}:
if x.size(0) == 1:
x = x.repeat(3, 1, 1)
assert x.size(0) == 3, "For single video format, expected RGB along first dim"
for t, m, s in zip(x, mean, std):
t.sub_(m)
t.div_(s)
elif x.dim() == 5:
assert (
x.shape[1] == 3
), "For batched video format, expected RGB along second dim"
x[:, 0].sub_(mean[0]).div_(std[0])
x[:, 1].sub_(mean[1]).div_(std[1])
x[:, 2].sub_(mean[2]).div_(std[2])
return x
def to_torch(self, ndarray):
if type(ndarray).__module__ == "numpy":
return torch.from_numpy(ndarray)
elif not torch.is_tensor(ndarray):
raise ValueError(f"Cannot convert {type(ndarray)} to torch tensor")
return ndarray
def to_numpy(self, tensor):
if torch.is_tensor(tensor):
return tensor.cpu().numpy()
elif type(tensor).__module__ != "numpy":
raise ValueError(f"Cannot convert {type(tensor)} to numpy array")
return tensor
def im_to_numpy(self, img):
img = self.to_numpy(img)
img = np.transpose(img, (1, 2, 0)) # H*W*C
return img
def im_to_torch(self, img):
img = np.transpose(img, (2, 0, 1)) # C*H*W
img = self.to_torch(img).float()
return img / 255 if img.max() > 1 else img
def load_model(self, checkpoint_path: str, number_of_classes: int, number_of_frames: int) -> Module:
model = DataParallel(InceptionI3d(
number_of_classes,
spatiotemporal_squeeze=True,
final_endpoint='Logits',
name="inception_i3d",
in_channels=3,
dropout_keep_prob=0.5,
num_in_frames=number_of_frames
)).cuda()
if not exists(Config.checkpoint_path):
call(f'cat app/checkpoints/* >> {Config.checkpoint_path}', shell=True, stdout=DEVNULL)
checkpoint = torch.load(checkpoint_path)
model.load_state_dict(checkpoint['state_dict'])
model.eval()
return model
def load_vocabulary(self, vocabulary_path: str) -> dict:
with open(vocabulary_path, 'rb') as file:
return pickle.load(file)
def prepare_input(self, video: deque, input_resolution: int=224, resize_resolution: int=256, mean: torch.Tensor=0.5*torch.ones(3), std: torch.Tensor=1.0*torch.ones(3)) -> np.ndarray:
video_tensor = torch.stack(
[self.im_to_torch(frame[:, :, [2, 1, 0]]) for frame in video]
).permute(1, 0, 2, 3)
iC, iF, _, _ = video_tensor.shape
video_tensor_resized = np.zeros((iF, resize_resolution, resize_resolution, iC))
for t in range(iF):
tmp = video_tensor[:, t, :, :]
tmp = self.resize_generic(
self.im_to_numpy(tmp), resize_resolution, resize_resolution, interp="bilinear", is_flow=False
)
video_tensor_resized[t] = tmp
video_tensor_resized = np.transpose(video_tensor_resized, (3, 0, 1, 2))
# Center crop coords
ulx = int((resize_resolution - input_resolution) / 2)
uly = int((resize_resolution - input_resolution) / 2)
# Crop 256x256
video_tensor_resized = video_tensor_resized[:, :, uly : uly + input_resolution, ulx : ulx + input_resolution]
video_tensor_resized = self.to_torch(video_tensor_resized).float()
assert video_tensor_resized.max() <= 1
video_tensor_resized = self.color_normalize(video_tensor_resized, mean, std)
return video_tensor_resized
def sliding_windows(self, input_video: torch.Tensor, number_of_frames: int, stride: int) -> torch.Tensor:
"""
Return sliding windows and corresponding (middle) timestamp
"""
C, nFrames, H, W = input_video.shape
# If needed, pad to the minimum clip length
if nFrames < number_of_frames:
rgb_ = torch.zeros(C, number_of_frames, H, W)
rgb_[:, :nFrames] = input_video
rgb_[:, nFrames:] = input_video[:, -1].unsqueeze(1)
input_video = rgb_
nFrames = input_video.shape[1]
num_clips = math.ceil((nFrames - number_of_frames) / stride) + 1
rgb_slided = torch.zeros(num_clips, 3, number_of_frames, H, W)
# For each clip
for j in range(num_clips):
# Check if num_clips becomes 0
stride_j = j * stride
actual_clip_length = min(number_of_frames, nFrames - stride_j)
t_beg = stride_j if actual_clip_length == number_of_frames else nFrames - number_of_frames
rgb_slided[j] = input_video[:, t_beg : t_beg + number_of_frames, :, :]
return rgb_slided
def video_to_asl(self, video: deque):
input_video = self.prepare_input(video)
input_sliding_window = self.sliding_windows(input_video, Config.number_of_frames, Config.stride)
num_clips = input_sliding_window.shape[0]
# Group the clips into batches
num_batches = math.ceil(num_clips / Config.batch_size)
raw_scores = np.empty((0, Config.number_of_classes), dtype=float)
for b in range(num_batches):
inp = input_sliding_window[b * Config.batch_size : (b + 1) * Config.batch_size]
# Forward pass
out = self.model(inp)
raw_scores = np.append(raw_scores, out["logits"].cpu().detach().numpy(), axis=0)
prob_scores = scipy.special.softmax(raw_scores, axis=1)
prob_sorted = np.sort(prob_scores, axis=1)[:, ::-1]
pred_sorted = np.argsort(prob_scores, axis=1)[:, ::-1]
word_topk = np.empty((Config.topk, num_clips), dtype=object)
for k in range(Config.topk):
for i, p in enumerate(pred_sorted[:, k]):
word_topk[k, i] = self.word_data["words"][p]
prob_topk = prob_sorted[:, :Config.topk].transpose()
# print(np.array([word_topk, prob_topk]).transpose())
self.result = "" if prob_topk[0, 0] <= self.confidence else word_topk[0, 0]
|