xfys's picture
Upload 645 files
47af768
import os
import csv
import numpy as np
from copy import deepcopy
from PIL import Image
from pycocotools import mask as mask_utils
from scipy.optimize import linear_sum_assignment
from trackeval.baselines.pascal_colormap import pascal_colormap
def load_seq(file_to_load):
""" Load input data from file in RobMOTS format (e.g. provided detections).
Returns: Data object with the following structure (see STP :
data['cls'][t] = {'ids', 'scores', 'im_hs', 'im_ws', 'mask_rles'}
"""
fp = open(file_to_load)
dialect = csv.Sniffer().sniff(fp.readline(), delimiters=' ')
dialect.skipinitialspace = True
fp.seek(0)
reader = csv.reader(fp, dialect)
read_data = {}
num_timesteps = 0
for i, row in enumerate(reader):
if row[-1] in '':
row = row[:-1]
t = int(row[0])
cid = row[1]
c = int(row[2])
s = row[3]
h = row[4]
w = row[5]
rle = row[6]
if t >= num_timesteps:
num_timesteps = t + 1
if c in read_data.keys():
if t in read_data[c].keys():
read_data[c][t]['ids'].append(cid)
read_data[c][t]['scores'].append(s)
read_data[c][t]['im_hs'].append(h)
read_data[c][t]['im_ws'].append(w)
read_data[c][t]['mask_rles'].append(rle)
else:
read_data[c][t] = {}
read_data[c][t]['ids'] = [cid]
read_data[c][t]['scores'] = [s]
read_data[c][t]['im_hs'] = [h]
read_data[c][t]['im_ws'] = [w]
read_data[c][t]['mask_rles'] = [rle]
else:
read_data[c] = {t: {}}
read_data[c][t]['ids'] = [cid]
read_data[c][t]['scores'] = [s]
read_data[c][t]['im_hs'] = [h]
read_data[c][t]['im_ws'] = [w]
read_data[c][t]['mask_rles'] = [rle]
fp.close()
data = {}
for c in read_data.keys():
data[c] = [{} for _ in range(num_timesteps)]
for t in range(num_timesteps):
if t in read_data[c].keys():
data[c][t]['ids'] = np.atleast_1d(read_data[c][t]['ids']).astype(int)
data[c][t]['scores'] = np.atleast_1d(read_data[c][t]['scores']).astype(float)
data[c][t]['im_hs'] = np.atleast_1d(read_data[c][t]['im_hs']).astype(int)
data[c][t]['im_ws'] = np.atleast_1d(read_data[c][t]['im_ws']).astype(int)
data[c][t]['mask_rles'] = np.atleast_1d(read_data[c][t]['mask_rles']).astype(str)
else:
data[c][t]['ids'] = np.empty(0).astype(int)
data[c][t]['scores'] = np.empty(0).astype(float)
data[c][t]['im_hs'] = np.empty(0).astype(int)
data[c][t]['im_ws'] = np.empty(0).astype(int)
data[c][t]['mask_rles'] = np.empty(0).astype(str)
return data
def threshold(tdata, thresh):
""" Removes detections below a certian threshold ('thresh') score. """
new_data = {}
to_keep = tdata['scores'] > thresh
for field in ['ids', 'scores', 'im_hs', 'im_ws', 'mask_rles']:
new_data[field] = tdata[field][to_keep]
return new_data
def create_coco_mask(mask_rles, im_hs, im_ws):
""" Converts mask as rle text (+ height and width) to encoded version used by pycocotools. """
coco_masks = [{'size': [h, w], 'counts': m.encode(encoding='UTF-8')}
for h, w, m in zip(im_hs, im_ws, mask_rles)]
return coco_masks
def mask_iou(mask_rles1, mask_rles2, im_hs, im_ws, do_ioa=0):
""" Calculate mask IoU between two masks.
Further allows 'intersection over area' instead of IoU (over the area of mask_rle1).
Allows either to pass in 1 boolean for do_ioa for all mask_rles2 or also one for each mask_rles2.
It is recommended that mask_rles1 is a detection and mask_rles2 is a groundtruth.
"""
coco_masks1 = create_coco_mask(mask_rles1, im_hs, im_ws)
coco_masks2 = create_coco_mask(mask_rles2, im_hs, im_ws)
if not hasattr(do_ioa, "__len__"):
do_ioa = [do_ioa]*len(coco_masks2)
assert(len(coco_masks2) == len(do_ioa))
if len(coco_masks1) == 0 or len(coco_masks2) == 0:
iou = np.zeros(len(coco_masks1), len(coco_masks2))
else:
iou = mask_utils.iou(coco_masks1, coco_masks2, do_ioa)
return iou
def sort_by_score(t_data):
""" Sorts data by score """
sort_index = np.argsort(t_data['scores'])[::-1]
for k in t_data.keys():
t_data[k] = t_data[k][sort_index]
return t_data
def mask_NMS(t_data, nms_threshold=0.5, already_sorted=False):
""" Remove redundant masks by performing non-maximum suppression (NMS) """
# Sort by score
if not already_sorted:
t_data = sort_by_score(t_data)
# Calculate the mask IoU between all detections in the timestep.
mask_ious_all = mask_iou(t_data['mask_rles'], t_data['mask_rles'], t_data['im_hs'], t_data['im_ws'])
# Determine which masks NMS should remove
# (those overlapping greater than nms_threshold with another mask that has a higher score)
num_dets = len(t_data['mask_rles'])
to_remove = [False for _ in range(num_dets)]
for i in range(num_dets):
if not to_remove[i]:
for j in range(i + 1, num_dets):
if mask_ious_all[i, j] > nms_threshold:
to_remove[j] = True
# Remove detections which should be removed
to_keep = np.logical_not(to_remove)
for k in t_data.keys():
t_data[k] = t_data[k][to_keep]
return t_data
def non_overlap(t_data, already_sorted=False):
""" Enforces masks to be non-overlapping in an image, does this by putting masks 'on top of one another',
such that higher score masks 'occlude' and thus remove parts of lower scoring masks.
Help wanted: if anyone knows a way to do this WITHOUT converting the RLE to the np.array let me know, because that
would be MUCH more efficient. (I have tried, but haven't yet had success).
"""
# Sort by score
if not already_sorted:
t_data = sort_by_score(t_data)
# Get coco masks
coco_masks = create_coco_mask(t_data['mask_rles'], t_data['im_hs'], t_data['im_ws'])
# Create a single np.array to hold all of the non-overlapping mask
masks_array = np.zeros((t_data['im_hs'][0], t_data['im_ws'][0]), 'uint8')
# Decode each mask into a np.array, and place it into the overall array for the whole frame.
# Since masks with the lowest score are placed first, they are 'partially overridden' by masks with a higher score
# if they overlap.
for i, mask in enumerate(coco_masks[::-1]):
masks_array[mask_utils.decode(mask).astype('bool')] = i + 1
# Encode the resulting np.array back into a set of coco_masks which are now non-overlapping.
num_dets = len(coco_masks)
for i, j in enumerate(range(1, num_dets + 1)[::-1]):
coco_masks[i] = mask_utils.encode(np.asfortranarray(masks_array == j, dtype=np.uint8))
# Convert from coco_mask back into our mask_rle format.
t_data['mask_rles'] = [m['counts'].decode("utf-8") for m in coco_masks]
return t_data
def masks2boxes(mask_rles, im_hs, im_ws):
""" Extracts bounding boxes which surround a set of masks. """
coco_masks = create_coco_mask(mask_rles, im_hs, im_ws)
boxes = np.array([mask_utils.toBbox(x) for x in coco_masks])
if len(boxes) == 0:
boxes = np.empty((0, 4))
return boxes
def box_iou(bboxes1, bboxes2, box_format='xywh', do_ioa=False, do_giou=False):
""" Calculates the IOU (intersection over union) between two arrays of boxes.
Allows variable box formats ('xywh' and 'x0y0x1y1').
If do_ioa (intersection over area), then calculates the intersection over the area of boxes1 - this is commonly
used to determine if detections are within crowd ignore region.
If do_giou (generalized intersection over union, then calculates giou.
"""
if len(bboxes1) == 0 or len(bboxes2) == 0:
ious = np.zeros((len(bboxes1), len(bboxes2)))
return ious
if box_format in 'xywh':
# layout: (x0, y0, w, h)
bboxes1 = deepcopy(bboxes1)
bboxes2 = deepcopy(bboxes2)
bboxes1[:, 2] = bboxes1[:, 0] + bboxes1[:, 2]
bboxes1[:, 3] = bboxes1[:, 1] + bboxes1[:, 3]
bboxes2[:, 2] = bboxes2[:, 0] + bboxes2[:, 2]
bboxes2[:, 3] = bboxes2[:, 1] + bboxes2[:, 3]
elif box_format not in 'x0y0x1y1':
raise (Exception('box_format %s is not implemented' % box_format))
# layout: (x0, y0, x1, y1)
min_ = np.minimum(bboxes1[:, np.newaxis, :], bboxes2[np.newaxis, :, :])
max_ = np.maximum(bboxes1[:, np.newaxis, :], bboxes2[np.newaxis, :, :])
intersection = np.maximum(min_[..., 2] - max_[..., 0], 0) * np.maximum(min_[..., 3] - max_[..., 1], 0)
area1 = (bboxes1[..., 2] - bboxes1[..., 0]) * (bboxes1[..., 3] - bboxes1[..., 1])
if do_ioa:
ioas = np.zeros_like(intersection)
valid_mask = area1 > 0 + np.finfo('float').eps
ioas[valid_mask, :] = intersection[valid_mask, :] / area1[valid_mask][:, np.newaxis]
return ioas
else:
area2 = (bboxes2[..., 2] - bboxes2[..., 0]) * (bboxes2[..., 3] - bboxes2[..., 1])
union = area1[:, np.newaxis] + area2[np.newaxis, :] - intersection
intersection[area1 <= 0 + np.finfo('float').eps, :] = 0
intersection[:, area2 <= 0 + np.finfo('float').eps] = 0
intersection[union <= 0 + np.finfo('float').eps] = 0
union[union <= 0 + np.finfo('float').eps] = 1
ious = intersection / union
if do_giou:
enclosing_area = np.maximum(max_[..., 2] - min_[..., 0], 0) * np.maximum(max_[..., 3] - min_[..., 1], 0)
eps = 1e-7
# giou
ious = ious - ((enclosing_area - union) / (enclosing_area + eps))
return ious
def match(match_scores):
match_rows, match_cols = linear_sum_assignment(-match_scores)
return match_rows, match_cols
def write_seq(output_data, out_file):
out_loc = os.path.dirname(out_file)
if not os.path.exists(out_loc):
os.makedirs(out_loc, exist_ok=True)
fp = open(out_file, 'w', newline='')
writer = csv.writer(fp, delimiter=' ')
for row in output_data:
writer.writerow(row)
fp.close()
def combine_classes(data):
""" Converts data from a class-separated to a class-combined format.
Input format: data['cls'][t] = {'ids', 'scores', 'im_hs', 'im_ws', 'mask_rles'}
Output format: data[t] = {'ids', 'scores', 'im_hs', 'im_ws', 'mask_rles', 'cls'}
"""
output_data = [{} for _ in list(data.values())[0]]
for cls, cls_data in data.items():
for timestep, t_data in enumerate(cls_data):
for k in t_data.keys():
if k in output_data[timestep].keys():
output_data[timestep][k] += list(t_data[k])
else:
output_data[timestep][k] = list(t_data[k])
if 'cls' in output_data[timestep].keys():
output_data[timestep]['cls'] += [cls]*len(output_data[timestep]['ids'])
else:
output_data[timestep]['cls'] = [cls]*len(output_data[timestep]['ids'])
for timestep, t_data in enumerate(output_data):
for k in t_data.keys():
output_data[timestep][k] = np.array(output_data[timestep][k])
return output_data
def save_as_png(t_data, out_file, im_h, im_w):
""" Save a set of segmentation masks into a PNG format, the same as used for the DAVIS dataset."""
if len(t_data['mask_rles']) > 0:
coco_masks = create_coco_mask(t_data['mask_rles'], t_data['im_hs'], t_data['im_ws'])
list_of_np_masks = [mask_utils.decode(mask) for mask in coco_masks]
png = np.zeros((t_data['im_hs'][0], t_data['im_ws'][0]))
for mask, c_id in zip(list_of_np_masks, t_data['ids']):
png[mask.astype("bool")] = c_id + 1
else:
png = np.zeros((im_h, im_w))
if not os.path.exists(os.path.dirname(out_file)):
os.makedirs(os.path.dirname(out_file))
colmap = (np.array(pascal_colormap) * 255).round().astype("uint8")
palimage = Image.new('P', (16, 16))
palimage.putpalette(colmap)
im = Image.fromarray(np.squeeze(png.astype("uint8")))
im2 = im.quantize(palette=palimage)
im2.save(out_file)
def get_frame_size(data):
""" Gets frame height and width from data. """
for cls, cls_data in data.items():
for timestep, t_data in enumerate(cls_data):
if len(t_data['im_hs'] > 0):
im_h = t_data['im_hs'][0]
im_w = t_data['im_ws'][0]
return im_h, im_w
return None