Spaces:
Runtime error
Runtime error
File size: 10,226 Bytes
9667e74 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
import math
import numpy as np
from operator import itemgetter
BODY_PARTS_KPT_IDS = [[1, 2], [1, 5], [2, 3], [3, 4], [5, 6], [6, 7], [1, 8], [8, 9], [9, 10], [1, 11],
[11, 12], [12, 13], [1, 0], [0, 14], [14, 16], [0, 15], [15, 17], [2, 16], [5, 17]]
BODY_PARTS_PAF_IDS = ([12, 13], [20, 21], [14, 15], [16, 17], [22, 23], [24, 25], [0, 1], [2, 3], [4, 5],
[6, 7], [8, 9], [10, 11], [28, 29], [30, 31], [34, 35], [32, 33], [36, 37], [18, 19], [26, 27])
def linspace2d(start, stop, n=10):
points = 1 / (n - 1) * (stop - start)
return points[:, None] * np.arange(n) + start[:, None]
def extract_keypoints(heatmap, all_keypoints, total_keypoint_num):
heatmap[heatmap < 0.1] = 0
heatmap_with_borders = np.pad(heatmap, [(2, 2), (2, 2)], mode='constant')
heatmap_center = heatmap_with_borders[1:heatmap_with_borders.shape[0]-1, 1:heatmap_with_borders.shape[1]-1]
heatmap_left = heatmap_with_borders[1:heatmap_with_borders.shape[0]-1, 2:heatmap_with_borders.shape[1]]
heatmap_right = heatmap_with_borders[1:heatmap_with_borders.shape[0]-1, 0:heatmap_with_borders.shape[1]-2]
heatmap_up = heatmap_with_borders[2:heatmap_with_borders.shape[0], 1:heatmap_with_borders.shape[1]-1]
heatmap_down = heatmap_with_borders[0:heatmap_with_borders.shape[0]-2, 1:heatmap_with_borders.shape[1]-1]
heatmap_peaks = (heatmap_center > heatmap_left) &\
(heatmap_center > heatmap_right) &\
(heatmap_center > heatmap_up) &\
(heatmap_center > heatmap_down)
heatmap_peaks = heatmap_peaks[1:heatmap_center.shape[0]-1, 1:heatmap_center.shape[1]-1]
keypoints = list(zip(np.nonzero(heatmap_peaks)[1], np.nonzero(heatmap_peaks)[0])) # (w, h)
keypoints = sorted(keypoints, key=itemgetter(0))
suppressed = np.zeros(len(keypoints), np.uint8)
keypoints_with_score_and_id = []
keypoint_num = 0
for i in range(len(keypoints)):
if suppressed[i]:
continue
for j in range(i+1, len(keypoints)):
if math.sqrt((keypoints[i][0] - keypoints[j][0]) ** 2 +
(keypoints[i][1] - keypoints[j][1]) ** 2) < 6:
suppressed[j] = 1
keypoint_with_score_and_id = (keypoints[i][0], keypoints[i][1], heatmap[keypoints[i][1], keypoints[i][0]],
total_keypoint_num + keypoint_num)
keypoints_with_score_and_id.append(keypoint_with_score_and_id)
keypoint_num += 1
all_keypoints.append(keypoints_with_score_and_id)
return keypoint_num
def group_keypoints(all_keypoints_by_type, pafs, pose_entry_size=20, min_paf_score=0.05, demo=False):
pose_entries = []
all_keypoints = np.array([item for sublist in all_keypoints_by_type for item in sublist])
for part_id in range(len(BODY_PARTS_PAF_IDS)):
part_pafs = pafs[:, :, BODY_PARTS_PAF_IDS[part_id]]
kpts_a = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][0]]
kpts_b = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][1]]
num_kpts_a = len(kpts_a)
num_kpts_b = len(kpts_b)
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0]
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1]
if num_kpts_a == 0 and num_kpts_b == 0: # no keypoints for such body part
continue
elif num_kpts_a == 0: # body part has just 'b' keypoints
for i in range(num_kpts_b):
num = 0
for j in range(len(pose_entries)): # check if already in some pose, was added by another body part
if pose_entries[j][kpt_b_id] == kpts_b[i][3]:
num += 1
continue
if num == 0:
pose_entry = np.ones(pose_entry_size) * -1
pose_entry[kpt_b_id] = kpts_b[i][3] # keypoint idx
pose_entry[-1] = 1 # num keypoints in pose
pose_entry[-2] = kpts_b[i][2] # pose score
pose_entries.append(pose_entry)
continue
elif num_kpts_b == 0: # body part has just 'a' keypoints
for i in range(num_kpts_a):
num = 0
for j in range(len(pose_entries)):
if pose_entries[j][kpt_a_id] == kpts_a[i][3]:
num += 1
continue
if num == 0:
pose_entry = np.ones(pose_entry_size) * -1
pose_entry[kpt_a_id] = kpts_a[i][3]
pose_entry[-1] = 1
pose_entry[-2] = kpts_a[i][2]
pose_entries.append(pose_entry)
continue
connections = []
for i in range(num_kpts_a):
kpt_a = np.array(kpts_a[i][0:2])
for j in range(num_kpts_b):
kpt_b = np.array(kpts_b[j][0:2])
mid_point = [(), ()]
mid_point[0] = (int(round((kpt_a[0] + kpt_b[0]) * 0.5)),
int(round((kpt_a[1] + kpt_b[1]) * 0.5)))
mid_point[1] = mid_point[0]
vec = [kpt_b[0] - kpt_a[0], kpt_b[1] - kpt_a[1]]
vec_norm = math.sqrt(vec[0] ** 2 + vec[1] ** 2)
if vec_norm == 0:
continue
vec[0] /= vec_norm
vec[1] /= vec_norm
cur_point_score = (vec[0] * part_pafs[mid_point[0][1], mid_point[0][0], 0] +
vec[1] * part_pafs[mid_point[1][1], mid_point[1][0], 1])
height_n = pafs.shape[0] // 2
success_ratio = 0
point_num = 10 # number of points to integration over paf
if cur_point_score > -100:
passed_point_score = 0
passed_point_num = 0
x, y = linspace2d(kpt_a, kpt_b)
for point_idx in range(point_num):
if not demo:
px = int(round(x[point_idx]))
py = int(round(y[point_idx]))
else:
px = int(x[point_idx])
py = int(y[point_idx])
paf = part_pafs[py, px, 0:2]
cur_point_score = vec[0] * paf[0] + vec[1] * paf[1]
if cur_point_score > min_paf_score:
passed_point_score += cur_point_score
passed_point_num += 1
success_ratio = passed_point_num / point_num
ratio = 0
if passed_point_num > 0:
ratio = passed_point_score / passed_point_num
ratio += min(height_n / vec_norm - 1, 0)
if ratio > 0 and success_ratio > 0.8:
score_all = ratio + kpts_a[i][2] + kpts_b[j][2]
connections.append([i, j, ratio, score_all])
if len(connections) > 0:
connections = sorted(connections, key=itemgetter(2), reverse=True)
num_connections = min(num_kpts_a, num_kpts_b)
has_kpt_a = np.zeros(num_kpts_a, dtype=np.int32)
has_kpt_b = np.zeros(num_kpts_b, dtype=np.int32)
filtered_connections = []
for row in range(len(connections)):
if len(filtered_connections) == num_connections:
break
i, j, cur_point_score = connections[row][0:3]
if not has_kpt_a[i] and not has_kpt_b[j]:
filtered_connections.append([kpts_a[i][3], kpts_b[j][3], cur_point_score])
has_kpt_a[i] = 1
has_kpt_b[j] = 1
connections = filtered_connections
if len(connections) == 0:
continue
if part_id == 0:
pose_entries = [np.ones(pose_entry_size) * -1 for _ in range(len(connections))]
for i in range(len(connections)):
pose_entries[i][BODY_PARTS_KPT_IDS[0][0]] = connections[i][0]
pose_entries[i][BODY_PARTS_KPT_IDS[0][1]] = connections[i][1]
pose_entries[i][-1] = 2
pose_entries[i][-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2]
elif part_id == 17 or part_id == 18:
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0]
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1]
for i in range(len(connections)):
for j in range(len(pose_entries)):
if pose_entries[j][kpt_a_id] == connections[i][0] and pose_entries[j][kpt_b_id] == -1:
pose_entries[j][kpt_b_id] = connections[i][1]
elif pose_entries[j][kpt_b_id] == connections[i][1] and pose_entries[j][kpt_a_id] == -1:
pose_entries[j][kpt_a_id] = connections[i][0]
continue
else:
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0]
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1]
for i in range(len(connections)):
num = 0
for j in range(len(pose_entries)):
if pose_entries[j][kpt_a_id] == connections[i][0]:
pose_entries[j][kpt_b_id] = connections[i][1]
num += 1
pose_entries[j][-1] += 1
pose_entries[j][-2] += all_keypoints[connections[i][1], 2] + connections[i][2]
if num == 0:
pose_entry = np.ones(pose_entry_size) * -1
pose_entry[kpt_a_id] = connections[i][0]
pose_entry[kpt_b_id] = connections[i][1]
pose_entry[-1] = 2
pose_entry[-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2]
pose_entries.append(pose_entry)
filtered_entries = []
for i in range(len(pose_entries)):
if pose_entries[i][-1] < 3 or (pose_entries[i][-2] / pose_entries[i][-1] < 0.2):
continue
filtered_entries.append(pose_entries[i])
pose_entries = np.asarray(filtered_entries)
return pose_entries, all_keypoints
|