Spaces:
Runtime error
Runtime error
import math | |
import numpy as np | |
from operator import itemgetter | |
BODY_PARTS_KPT_IDS = [[1, 2], [1, 5], [2, 3], [3, 4], [5, 6], [6, 7], [1, 8], [8, 9], [9, 10], [1, 11], | |
[11, 12], [12, 13], [1, 0], [0, 14], [14, 16], [0, 15], [15, 17], [2, 16], [5, 17]] | |
BODY_PARTS_PAF_IDS = ([12, 13], [20, 21], [14, 15], [16, 17], [22, 23], [24, 25], [0, 1], [2, 3], [4, 5], | |
[6, 7], [8, 9], [10, 11], [28, 29], [30, 31], [34, 35], [32, 33], [36, 37], [18, 19], [26, 27]) | |
def linspace2d(start, stop, n=10): | |
points = 1 / (n - 1) * (stop - start) | |
return points[:, None] * np.arange(n) + start[:, None] | |
def extract_keypoints(heatmap, all_keypoints, total_keypoint_num): | |
heatmap[heatmap < 0.1] = 0 | |
heatmap_with_borders = np.pad(heatmap, [(2, 2), (2, 2)], mode='constant') | |
heatmap_center = heatmap_with_borders[1:heatmap_with_borders.shape[0]-1, 1:heatmap_with_borders.shape[1]-1] | |
heatmap_left = heatmap_with_borders[1:heatmap_with_borders.shape[0]-1, 2:heatmap_with_borders.shape[1]] | |
heatmap_right = heatmap_with_borders[1:heatmap_with_borders.shape[0]-1, 0:heatmap_with_borders.shape[1]-2] | |
heatmap_up = heatmap_with_borders[2:heatmap_with_borders.shape[0], 1:heatmap_with_borders.shape[1]-1] | |
heatmap_down = heatmap_with_borders[0:heatmap_with_borders.shape[0]-2, 1:heatmap_with_borders.shape[1]-1] | |
heatmap_peaks = (heatmap_center > heatmap_left) &\ | |
(heatmap_center > heatmap_right) &\ | |
(heatmap_center > heatmap_up) &\ | |
(heatmap_center > heatmap_down) | |
heatmap_peaks = heatmap_peaks[1:heatmap_center.shape[0]-1, 1:heatmap_center.shape[1]-1] | |
keypoints = list(zip(np.nonzero(heatmap_peaks)[1], np.nonzero(heatmap_peaks)[0])) # (w, h) | |
keypoints = sorted(keypoints, key=itemgetter(0)) | |
suppressed = np.zeros(len(keypoints), np.uint8) | |
keypoints_with_score_and_id = [] | |
keypoint_num = 0 | |
for i in range(len(keypoints)): | |
if suppressed[i]: | |
continue | |
for j in range(i+1, len(keypoints)): | |
if math.sqrt((keypoints[i][0] - keypoints[j][0]) ** 2 + | |
(keypoints[i][1] - keypoints[j][1]) ** 2) < 6: | |
suppressed[j] = 1 | |
keypoint_with_score_and_id = (keypoints[i][0], keypoints[i][1], heatmap[keypoints[i][1], keypoints[i][0]], | |
total_keypoint_num + keypoint_num) | |
keypoints_with_score_and_id.append(keypoint_with_score_and_id) | |
keypoint_num += 1 | |
all_keypoints.append(keypoints_with_score_and_id) | |
return keypoint_num | |
def group_keypoints(all_keypoints_by_type, pafs, pose_entry_size=20, min_paf_score=0.05, demo=False): | |
pose_entries = [] | |
all_keypoints = np.array([item for sublist in all_keypoints_by_type for item in sublist]) | |
for part_id in range(len(BODY_PARTS_PAF_IDS)): | |
part_pafs = pafs[:, :, BODY_PARTS_PAF_IDS[part_id]] | |
kpts_a = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][0]] | |
kpts_b = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][1]] | |
num_kpts_a = len(kpts_a) | |
num_kpts_b = len(kpts_b) | |
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0] | |
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1] | |
if num_kpts_a == 0 and num_kpts_b == 0: # no keypoints for such body part | |
continue | |
elif num_kpts_a == 0: # body part has just 'b' keypoints | |
for i in range(num_kpts_b): | |
num = 0 | |
for j in range(len(pose_entries)): # check if already in some pose, was added by another body part | |
if pose_entries[j][kpt_b_id] == kpts_b[i][3]: | |
num += 1 | |
continue | |
if num == 0: | |
pose_entry = np.ones(pose_entry_size) * -1 | |
pose_entry[kpt_b_id] = kpts_b[i][3] # keypoint idx | |
pose_entry[-1] = 1 # num keypoints in pose | |
pose_entry[-2] = kpts_b[i][2] # pose score | |
pose_entries.append(pose_entry) | |
continue | |
elif num_kpts_b == 0: # body part has just 'a' keypoints | |
for i in range(num_kpts_a): | |
num = 0 | |
for j in range(len(pose_entries)): | |
if pose_entries[j][kpt_a_id] == kpts_a[i][3]: | |
num += 1 | |
continue | |
if num == 0: | |
pose_entry = np.ones(pose_entry_size) * -1 | |
pose_entry[kpt_a_id] = kpts_a[i][3] | |
pose_entry[-1] = 1 | |
pose_entry[-2] = kpts_a[i][2] | |
pose_entries.append(pose_entry) | |
continue | |
connections = [] | |
for i in range(num_kpts_a): | |
kpt_a = np.array(kpts_a[i][0:2]) | |
for j in range(num_kpts_b): | |
kpt_b = np.array(kpts_b[j][0:2]) | |
mid_point = [(), ()] | |
mid_point[0] = (int(round((kpt_a[0] + kpt_b[0]) * 0.5)), | |
int(round((kpt_a[1] + kpt_b[1]) * 0.5))) | |
mid_point[1] = mid_point[0] | |
vec = [kpt_b[0] - kpt_a[0], kpt_b[1] - kpt_a[1]] | |
vec_norm = math.sqrt(vec[0] ** 2 + vec[1] ** 2) | |
if vec_norm == 0: | |
continue | |
vec[0] /= vec_norm | |
vec[1] /= vec_norm | |
cur_point_score = (vec[0] * part_pafs[mid_point[0][1], mid_point[0][0], 0] + | |
vec[1] * part_pafs[mid_point[1][1], mid_point[1][0], 1]) | |
height_n = pafs.shape[0] // 2 | |
success_ratio = 0 | |
point_num = 10 # number of points to integration over paf | |
if cur_point_score > -100: | |
passed_point_score = 0 | |
passed_point_num = 0 | |
x, y = linspace2d(kpt_a, kpt_b) | |
for point_idx in range(point_num): | |
if not demo: | |
px = int(round(x[point_idx])) | |
py = int(round(y[point_idx])) | |
else: | |
px = int(x[point_idx]) | |
py = int(y[point_idx]) | |
paf = part_pafs[py, px, 0:2] | |
cur_point_score = vec[0] * paf[0] + vec[1] * paf[1] | |
if cur_point_score > min_paf_score: | |
passed_point_score += cur_point_score | |
passed_point_num += 1 | |
success_ratio = passed_point_num / point_num | |
ratio = 0 | |
if passed_point_num > 0: | |
ratio = passed_point_score / passed_point_num | |
ratio += min(height_n / vec_norm - 1, 0) | |
if ratio > 0 and success_ratio > 0.8: | |
score_all = ratio + kpts_a[i][2] + kpts_b[j][2] | |
connections.append([i, j, ratio, score_all]) | |
if len(connections) > 0: | |
connections = sorted(connections, key=itemgetter(2), reverse=True) | |
num_connections = min(num_kpts_a, num_kpts_b) | |
has_kpt_a = np.zeros(num_kpts_a, dtype=np.int32) | |
has_kpt_b = np.zeros(num_kpts_b, dtype=np.int32) | |
filtered_connections = [] | |
for row in range(len(connections)): | |
if len(filtered_connections) == num_connections: | |
break | |
i, j, cur_point_score = connections[row][0:3] | |
if not has_kpt_a[i] and not has_kpt_b[j]: | |
filtered_connections.append([kpts_a[i][3], kpts_b[j][3], cur_point_score]) | |
has_kpt_a[i] = 1 | |
has_kpt_b[j] = 1 | |
connections = filtered_connections | |
if len(connections) == 0: | |
continue | |
if part_id == 0: | |
pose_entries = [np.ones(pose_entry_size) * -1 for _ in range(len(connections))] | |
for i in range(len(connections)): | |
pose_entries[i][BODY_PARTS_KPT_IDS[0][0]] = connections[i][0] | |
pose_entries[i][BODY_PARTS_KPT_IDS[0][1]] = connections[i][1] | |
pose_entries[i][-1] = 2 | |
pose_entries[i][-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2] | |
elif part_id == 17 or part_id == 18: | |
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0] | |
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1] | |
for i in range(len(connections)): | |
for j in range(len(pose_entries)): | |
if pose_entries[j][kpt_a_id] == connections[i][0] and pose_entries[j][kpt_b_id] == -1: | |
pose_entries[j][kpt_b_id] = connections[i][1] | |
elif pose_entries[j][kpt_b_id] == connections[i][1] and pose_entries[j][kpt_a_id] == -1: | |
pose_entries[j][kpt_a_id] = connections[i][0] | |
continue | |
else: | |
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0] | |
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1] | |
for i in range(len(connections)): | |
num = 0 | |
for j in range(len(pose_entries)): | |
if pose_entries[j][kpt_a_id] == connections[i][0]: | |
pose_entries[j][kpt_b_id] = connections[i][1] | |
num += 1 | |
pose_entries[j][-1] += 1 | |
pose_entries[j][-2] += all_keypoints[connections[i][1], 2] + connections[i][2] | |
if num == 0: | |
pose_entry = np.ones(pose_entry_size) * -1 | |
pose_entry[kpt_a_id] = connections[i][0] | |
pose_entry[kpt_b_id] = connections[i][1] | |
pose_entry[-1] = 2 | |
pose_entry[-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2] | |
pose_entries.append(pose_entry) | |
filtered_entries = [] | |
for i in range(len(pose_entries)): | |
if pose_entries[i][-1] < 3 or (pose_entries[i][-2] / pose_entries[i][-1] < 0.2): | |
continue | |
filtered_entries.append(pose_entries[i]) | |
pose_entries = np.asarray(filtered_entries) | |
return pose_entries, all_keypoints | |