Spaces:
Build error
Build error
import os | |
import csv | |
import numpy as np | |
from copy import deepcopy | |
from PIL import Image | |
from pycocotools import mask as mask_utils | |
from scipy.optimize import linear_sum_assignment | |
from trackeval.baselines.pascal_colormap import pascal_colormap | |
def load_seq(file_to_load): | |
""" Load input data from file in RobMOTS format (e.g. provided detections). | |
Returns: Data object with the following structure (see STP : | |
data['cls'][t] = {'ids', 'scores', 'im_hs', 'im_ws', 'mask_rles'} | |
""" | |
fp = open(file_to_load) | |
dialect = csv.Sniffer().sniff(fp.readline(), delimiters=' ') | |
dialect.skipinitialspace = True | |
fp.seek(0) | |
reader = csv.reader(fp, dialect) | |
read_data = {} | |
num_timesteps = 0 | |
for i, row in enumerate(reader): | |
if row[-1] in '': | |
row = row[:-1] | |
t = int(row[0]) | |
cid = row[1] | |
c = int(row[2]) | |
s = row[3] | |
h = row[4] | |
w = row[5] | |
rle = row[6] | |
if t >= num_timesteps: | |
num_timesteps = t + 1 | |
if c in read_data.keys(): | |
if t in read_data[c].keys(): | |
read_data[c][t]['ids'].append(cid) | |
read_data[c][t]['scores'].append(s) | |
read_data[c][t]['im_hs'].append(h) | |
read_data[c][t]['im_ws'].append(w) | |
read_data[c][t]['mask_rles'].append(rle) | |
else: | |
read_data[c][t] = {} | |
read_data[c][t]['ids'] = [cid] | |
read_data[c][t]['scores'] = [s] | |
read_data[c][t]['im_hs'] = [h] | |
read_data[c][t]['im_ws'] = [w] | |
read_data[c][t]['mask_rles'] = [rle] | |
else: | |
read_data[c] = {t: {}} | |
read_data[c][t]['ids'] = [cid] | |
read_data[c][t]['scores'] = [s] | |
read_data[c][t]['im_hs'] = [h] | |
read_data[c][t]['im_ws'] = [w] | |
read_data[c][t]['mask_rles'] = [rle] | |
fp.close() | |
data = {} | |
for c in read_data.keys(): | |
data[c] = [{} for _ in range(num_timesteps)] | |
for t in range(num_timesteps): | |
if t in read_data[c].keys(): | |
data[c][t]['ids'] = np.atleast_1d(read_data[c][t]['ids']).astype(int) | |
data[c][t]['scores'] = np.atleast_1d(read_data[c][t]['scores']).astype(float) | |
data[c][t]['im_hs'] = np.atleast_1d(read_data[c][t]['im_hs']).astype(int) | |
data[c][t]['im_ws'] = np.atleast_1d(read_data[c][t]['im_ws']).astype(int) | |
data[c][t]['mask_rles'] = np.atleast_1d(read_data[c][t]['mask_rles']).astype(str) | |
else: | |
data[c][t]['ids'] = np.empty(0).astype(int) | |
data[c][t]['scores'] = np.empty(0).astype(float) | |
data[c][t]['im_hs'] = np.empty(0).astype(int) | |
data[c][t]['im_ws'] = np.empty(0).astype(int) | |
data[c][t]['mask_rles'] = np.empty(0).astype(str) | |
return data | |
def threshold(tdata, thresh): | |
""" Removes detections below a certian threshold ('thresh') score. """ | |
new_data = {} | |
to_keep = tdata['scores'] > thresh | |
for field in ['ids', 'scores', 'im_hs', 'im_ws', 'mask_rles']: | |
new_data[field] = tdata[field][to_keep] | |
return new_data | |
def create_coco_mask(mask_rles, im_hs, im_ws): | |
""" Converts mask as rle text (+ height and width) to encoded version used by pycocotools. """ | |
coco_masks = [{'size': [h, w], 'counts': m.encode(encoding='UTF-8')} | |
for h, w, m in zip(im_hs, im_ws, mask_rles)] | |
return coco_masks | |
def mask_iou(mask_rles1, mask_rles2, im_hs, im_ws, do_ioa=0): | |
""" Calculate mask IoU between two masks. | |
Further allows 'intersection over area' instead of IoU (over the area of mask_rle1). | |
Allows either to pass in 1 boolean for do_ioa for all mask_rles2 or also one for each mask_rles2. | |
It is recommended that mask_rles1 is a detection and mask_rles2 is a groundtruth. | |
""" | |
coco_masks1 = create_coco_mask(mask_rles1, im_hs, im_ws) | |
coco_masks2 = create_coco_mask(mask_rles2, im_hs, im_ws) | |
if not hasattr(do_ioa, "__len__"): | |
do_ioa = [do_ioa]*len(coco_masks2) | |
assert(len(coco_masks2) == len(do_ioa)) | |
if len(coco_masks1) == 0 or len(coco_masks2) == 0: | |
iou = np.zeros(len(coco_masks1), len(coco_masks2)) | |
else: | |
iou = mask_utils.iou(coco_masks1, coco_masks2, do_ioa) | |
return iou | |
def sort_by_score(t_data): | |
""" Sorts data by score """ | |
sort_index = np.argsort(t_data['scores'])[::-1] | |
for k in t_data.keys(): | |
t_data[k] = t_data[k][sort_index] | |
return t_data | |
def mask_NMS(t_data, nms_threshold=0.5, already_sorted=False): | |
""" Remove redundant masks by performing non-maximum suppression (NMS) """ | |
# Sort by score | |
if not already_sorted: | |
t_data = sort_by_score(t_data) | |
# Calculate the mask IoU between all detections in the timestep. | |
mask_ious_all = mask_iou(t_data['mask_rles'], t_data['mask_rles'], t_data['im_hs'], t_data['im_ws']) | |
# Determine which masks NMS should remove | |
# (those overlapping greater than nms_threshold with another mask that has a higher score) | |
num_dets = len(t_data['mask_rles']) | |
to_remove = [False for _ in range(num_dets)] | |
for i in range(num_dets): | |
if not to_remove[i]: | |
for j in range(i + 1, num_dets): | |
if mask_ious_all[i, j] > nms_threshold: | |
to_remove[j] = True | |
# Remove detections which should be removed | |
to_keep = np.logical_not(to_remove) | |
for k in t_data.keys(): | |
t_data[k] = t_data[k][to_keep] | |
return t_data | |
def non_overlap(t_data, already_sorted=False): | |
""" Enforces masks to be non-overlapping in an image, does this by putting masks 'on top of one another', | |
such that higher score masks 'occlude' and thus remove parts of lower scoring masks. | |
Help wanted: if anyone knows a way to do this WITHOUT converting the RLE to the np.array let me know, because that | |
would be MUCH more efficient. (I have tried, but haven't yet had success). | |
""" | |
# Sort by score | |
if not already_sorted: | |
t_data = sort_by_score(t_data) | |
# Get coco masks | |
coco_masks = create_coco_mask(t_data['mask_rles'], t_data['im_hs'], t_data['im_ws']) | |
# Create a single np.array to hold all of the non-overlapping mask | |
masks_array = np.zeros((t_data['im_hs'][0], t_data['im_ws'][0]), 'uint8') | |
# Decode each mask into a np.array, and place it into the overall array for the whole frame. | |
# Since masks with the lowest score are placed first, they are 'partially overridden' by masks with a higher score | |
# if they overlap. | |
for i, mask in enumerate(coco_masks[::-1]): | |
masks_array[mask_utils.decode(mask).astype('bool')] = i + 1 | |
# Encode the resulting np.array back into a set of coco_masks which are now non-overlapping. | |
num_dets = len(coco_masks) | |
for i, j in enumerate(range(1, num_dets + 1)[::-1]): | |
coco_masks[i] = mask_utils.encode(np.asfortranarray(masks_array == j, dtype=np.uint8)) | |
# Convert from coco_mask back into our mask_rle format. | |
t_data['mask_rles'] = [m['counts'].decode("utf-8") for m in coco_masks] | |
return t_data | |
def masks2boxes(mask_rles, im_hs, im_ws): | |
""" Extracts bounding boxes which surround a set of masks. """ | |
coco_masks = create_coco_mask(mask_rles, im_hs, im_ws) | |
boxes = np.array([mask_utils.toBbox(x) for x in coco_masks]) | |
if len(boxes) == 0: | |
boxes = np.empty((0, 4)) | |
return boxes | |
def box_iou(bboxes1, bboxes2, box_format='xywh', do_ioa=False, do_giou=False): | |
""" Calculates the IOU (intersection over union) between two arrays of boxes. | |
Allows variable box formats ('xywh' and 'x0y0x1y1'). | |
If do_ioa (intersection over area), then calculates the intersection over the area of boxes1 - this is commonly | |
used to determine if detections are within crowd ignore region. | |
If do_giou (generalized intersection over union, then calculates giou. | |
""" | |
if len(bboxes1) == 0 or len(bboxes2) == 0: | |
ious = np.zeros((len(bboxes1), len(bboxes2))) | |
return ious | |
if box_format in 'xywh': | |
# layout: (x0, y0, w, h) | |
bboxes1 = deepcopy(bboxes1) | |
bboxes2 = deepcopy(bboxes2) | |
bboxes1[:, 2] = bboxes1[:, 0] + bboxes1[:, 2] | |
bboxes1[:, 3] = bboxes1[:, 1] + bboxes1[:, 3] | |
bboxes2[:, 2] = bboxes2[:, 0] + bboxes2[:, 2] | |
bboxes2[:, 3] = bboxes2[:, 1] + bboxes2[:, 3] | |
elif box_format not in 'x0y0x1y1': | |
raise (Exception('box_format %s is not implemented' % box_format)) | |
# layout: (x0, y0, x1, y1) | |
min_ = np.minimum(bboxes1[:, np.newaxis, :], bboxes2[np.newaxis, :, :]) | |
max_ = np.maximum(bboxes1[:, np.newaxis, :], bboxes2[np.newaxis, :, :]) | |
intersection = np.maximum(min_[..., 2] - max_[..., 0], 0) * np.maximum(min_[..., 3] - max_[..., 1], 0) | |
area1 = (bboxes1[..., 2] - bboxes1[..., 0]) * (bboxes1[..., 3] - bboxes1[..., 1]) | |
if do_ioa: | |
ioas = np.zeros_like(intersection) | |
valid_mask = area1 > 0 + np.finfo('float').eps | |
ioas[valid_mask, :] = intersection[valid_mask, :] / area1[valid_mask][:, np.newaxis] | |
return ioas | |
else: | |
area2 = (bboxes2[..., 2] - bboxes2[..., 0]) * (bboxes2[..., 3] - bboxes2[..., 1]) | |
union = area1[:, np.newaxis] + area2[np.newaxis, :] - intersection | |
intersection[area1 <= 0 + np.finfo('float').eps, :] = 0 | |
intersection[:, area2 <= 0 + np.finfo('float').eps] = 0 | |
intersection[union <= 0 + np.finfo('float').eps] = 0 | |
union[union <= 0 + np.finfo('float').eps] = 1 | |
ious = intersection / union | |
if do_giou: | |
enclosing_area = np.maximum(max_[..., 2] - min_[..., 0], 0) * np.maximum(max_[..., 3] - min_[..., 1], 0) | |
eps = 1e-7 | |
# giou | |
ious = ious - ((enclosing_area - union) / (enclosing_area + eps)) | |
return ious | |
def match(match_scores): | |
match_rows, match_cols = linear_sum_assignment(-match_scores) | |
return match_rows, match_cols | |
def write_seq(output_data, out_file): | |
out_loc = os.path.dirname(out_file) | |
if not os.path.exists(out_loc): | |
os.makedirs(out_loc, exist_ok=True) | |
fp = open(out_file, 'w', newline='') | |
writer = csv.writer(fp, delimiter=' ') | |
for row in output_data: | |
writer.writerow(row) | |
fp.close() | |
def combine_classes(data): | |
""" Converts data from a class-separated to a class-combined format. | |
Input format: data['cls'][t] = {'ids', 'scores', 'im_hs', 'im_ws', 'mask_rles'} | |
Output format: data[t] = {'ids', 'scores', 'im_hs', 'im_ws', 'mask_rles', 'cls'} | |
""" | |
output_data = [{} for _ in list(data.values())[0]] | |
for cls, cls_data in data.items(): | |
for timestep, t_data in enumerate(cls_data): | |
for k in t_data.keys(): | |
if k in output_data[timestep].keys(): | |
output_data[timestep][k] += list(t_data[k]) | |
else: | |
output_data[timestep][k] = list(t_data[k]) | |
if 'cls' in output_data[timestep].keys(): | |
output_data[timestep]['cls'] += [cls]*len(output_data[timestep]['ids']) | |
else: | |
output_data[timestep]['cls'] = [cls]*len(output_data[timestep]['ids']) | |
for timestep, t_data in enumerate(output_data): | |
for k in t_data.keys(): | |
output_data[timestep][k] = np.array(output_data[timestep][k]) | |
return output_data | |
def save_as_png(t_data, out_file, im_h, im_w): | |
""" Save a set of segmentation masks into a PNG format, the same as used for the DAVIS dataset.""" | |
if len(t_data['mask_rles']) > 0: | |
coco_masks = create_coco_mask(t_data['mask_rles'], t_data['im_hs'], t_data['im_ws']) | |
list_of_np_masks = [mask_utils.decode(mask) for mask in coco_masks] | |
png = np.zeros((t_data['im_hs'][0], t_data['im_ws'][0])) | |
for mask, c_id in zip(list_of_np_masks, t_data['ids']): | |
png[mask.astype("bool")] = c_id + 1 | |
else: | |
png = np.zeros((im_h, im_w)) | |
if not os.path.exists(os.path.dirname(out_file)): | |
os.makedirs(os.path.dirname(out_file)) | |
colmap = (np.array(pascal_colormap) * 255).round().astype("uint8") | |
palimage = Image.new('P', (16, 16)) | |
palimage.putpalette(colmap) | |
im = Image.fromarray(np.squeeze(png.astype("uint8"))) | |
im2 = im.quantize(palette=palimage) | |
im2.save(out_file) | |
def get_frame_size(data): | |
""" Gets frame height and width from data. """ | |
for cls, cls_data in data.items(): | |
for timestep, t_data in enumerate(cls_data): | |
if len(t_data['im_hs'] > 0): | |
im_h = t_data['im_hs'][0] | |
im_w = t_data['im_ws'][0] | |
return im_h, im_w | |
return None | |