import math from operator import itemgetter import cv2 import numpy as np from pythreejs import * class Engine3js: """ The implementation of these interfaces depends on Pythreejs, so make sure you install and authorize the Jupyter Widgets plug-in correctly. Attributes: view_width, view_height: Size of the view window. position: Position of the camera. lookAtPos: The point at which the camera looks at. axis_size: The size of axis. grid_length: grid size, length == width. grid_num: grid number. grid: If use grid. axis: If use axis. """ def __init__( self, view_width=455, view_height=256, position=[300, 100, 0], lookAtPos=[0, 0, 0], axis_size=300, grid_length=600, grid_num=20, grid=False, axis=False, ): self.view_width = view_width self.view_height = view_height self.position = position # set the camera self.cam = PerspectiveCamera(position=self.position, aspect=self.view_width / self.view_height) self.cam.lookAt(lookAtPos) # x,y,z axis self.axis = AxesHelper(axis_size) # axes length # set grid size self.gridHelper = GridHelper(grid_length, grid_num) # set scene self.scene = Scene( children=[ self.cam, DirectionalLight(position=[3, 5, 1], intensity=0.6), AmbientLight(intensity=0.5), ] ) # add axis or grid if grid: self.scene.add(self.gridHelper) if axis: self.scene.add(self.axis) # render the objects in scene self.renderer = Renderer( camera=self.cam, scene=self.scene, controls=[OrbitControls(controlling=self.cam)], width=self.view_width, height=self.view_height, ) # display(renderer4) def get_width(self): return self.view_width def plot(self): self.renderer.render(self.scene, self.cam) def scene_add(self, object): self.scene.add(object) def scene_remove(self, object): self.scene.remove(object) class Geometry: """ This is the geometry base class that defines buffer and material. """ def __init__(self, name="geometry"): self.geometry = None self.material = None self.name = name def get_Name(): return self.name class Skeleton(Geometry): """ This is the class for drawing human body poses. """ def __init__(self, name="skeleton", lineWidth=3, body_edges=[]): super(Skeleton, self).__init__(name) self.material = LineBasicMaterial(vertexColors="VertexColors", linewidth=lineWidth) self.colorSet = BufferAttribute( np.array( [ [1, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [1, 0, 0], [0, 1, 0], ], dtype=np.float32, ), normalized=False, ) # self.geometry.attributes["color"] = self.colorSet self.body_edges = body_edges def __call__(self, poses_3d): poses = [] for pose_position_tmp in poses_3d: bones = [] for edge in self.body_edges: # put pair of points as limbs bones.append(pose_position_tmp[edge[0]]) bones.append(pose_position_tmp[edge[1]]) bones = np.asarray(bones, dtype=np.float32) # You can find the api in https://github.com/jupyter-widgets/pythreejs self.geometry = BufferGeometry( attributes={ "position": BufferAttribute(bones, normalized=False), # It defines limbs' color "color": self.colorSet, } ) pose = LineSegments(self.geometry, self.material) poses.append(pose) # self.geometry.close() return poses def plot(self, pose_points=None): return self.__call__(pose_points) class Cloudpoint(Geometry): """ This is the class for drawing cloud points. """ def __init__(self, name="cloudpoint", points=[], point_size=5, line=None, points_color="blue"): super(Cloudpoint, self).__init__(name) self.material = PointsMaterial(size=point_size, color=points_color) self.points = points self.line = line def __call__(self, points_3d): self.geometry = BufferGeometry( attributes={ "position": BufferAttribute(points_3d, normalized=False), # It defines points' vertices' color "color": BufferAttribute( np.array( [ [1, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [1, 0, 0], [0, 1, 0], ], dtype=np.float32, ), normalized=False, ), }, ) cloud_points = Points(self.geometry, self.material) if self.line is not None: g1 = BufferGeometry( attributes={ "position": BufferAttribute(line, normalized=False), # It defines limbs' color "color": BufferAttribute( # Here you can set vertex colors, if you set the 'color' option = vertexes np.array( [ [1, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [1, 0, 0], [0, 1, 0], ], dtype=np.float32, ), normalized=False, ), }, ) m1 = LineBasicMaterial(color="red", linewidth=3) facemesh = LineSegments(g1, m1) return [cloud_points, facemesh] return cloud_points def Box_bounding(Geometry): def __init__(self, name="Box", lineWidth=3): super(Box_bounding, self).__init__(name) self.material = LineBasicMaterial(vertexColors="VertexColors", linewidth=lineWidth) self.edge = [] def __call__(self, points=None): pass class Pose: num_kpts = 18 kpt_names = [ "neck", "nose", "l_sho", "l_elb", "l_wri", "l_hip", "l_knee", "l_ank", "r_sho", "r_elb", "r_wri", "r_hip", "r_knee", "r_ank", "r_eye", "l_eye", "r_ear", "l_ear", ] sigmas = ( np.array( [ 0.79, 0.26, 0.79, 0.72, 0.62, 1.07, 0.87, 0.89, 0.79, 0.72, 0.62, 1.07, 0.87, 0.89, 0.25, 0.25, 0.35, 0.35, ], dtype=np.float32, ) / 10.0 ) vars = (sigmas * 2) ** 2 last_id = -1 color = [0, 224, 255] def __init__(self, keypoints, confidence): super().__init__() self.keypoints = keypoints self.confidence = confidence found_keypoints = np.zeros((np.count_nonzero(keypoints[:, 0] != -1), 2), dtype=np.int32) found_kpt_id = 0 for kpt_id in range(keypoints.shape[0]): if keypoints[kpt_id, 0] == -1: continue found_keypoints[found_kpt_id] = keypoints[kpt_id] found_kpt_id += 1 self.bbox = cv2.boundingRect(found_keypoints) self.id = None self.translation_filter = [ OneEuroFilter(freq=80, beta=0.01), OneEuroFilter(freq=80, beta=0.01), OneEuroFilter(freq=80, beta=0.01), ] def update_id(self, id=None): self.id = id if self.id is None: self.id = Pose.last_id + 1 Pose.last_id += 1 def filter(self, translation): filtered_translation = [] for coordinate_id in range(3): filtered_translation.append(self.translation_filter[coordinate_id](translation[coordinate_id])) return filtered_translation def get_similarity(a, b, threshold=0.5): num_similar_kpt = 0 for kpt_id in range(Pose.num_kpts): if a.keypoints[kpt_id, 0] != -1 and b.keypoints[kpt_id, 0] != -1: distance = np.sum((a.keypoints[kpt_id] - b.keypoints[kpt_id]) ** 2) area = max(a.bbox[2] * a.bbox[3], b.bbox[2] * b.bbox[3]) similarity = np.exp(-distance / (2 * (area + np.spacing(1)) * Pose.vars[kpt_id])) if similarity > threshold: num_similar_kpt += 1 return num_similar_kpt def propagate_ids(previous_poses, current_poses, threshold=3): """Propagate poses ids from previous frame results. Id is propagated, if there are at least `threshold` similar keypoints between pose from previous frame and current. :param previous_poses: poses from previous frame with ids :param current_poses: poses from current frame to assign ids :param threshold: minimal number of similar keypoints between poses :return: None """ current_poses_sorted_ids = list(range(len(current_poses))) current_poses_sorted_ids = sorted( current_poses_sorted_ids, key=lambda pose_id: current_poses[pose_id].confidence, reverse=True, ) # match confident poses first mask = np.ones(len(previous_poses), dtype=np.int32) for current_pose_id in current_poses_sorted_ids: best_matched_id = None best_matched_pose_id = None best_matched_iou = 0 for previous_pose_id in range(len(previous_poses)): if not mask[previous_pose_id]: continue iou = get_similarity(current_poses[current_pose_id], previous_poses[previous_pose_id]) if iou > best_matched_iou: best_matched_iou = iou best_matched_pose_id = previous_poses[previous_pose_id].id best_matched_id = previous_pose_id if best_matched_iou >= threshold: mask[best_matched_id] = 0 else: # pose not similar to any previous best_matched_pose_id = None current_poses[current_pose_id].update_id(best_matched_pose_id) if best_matched_pose_id is not None: current_poses[current_pose_id].translation_filter = previous_poses[best_matched_id].translation_filter AVG_PERSON_HEIGHT = 180 # pelvis (body center) is missing, id == 2 map_id_to_panoptic = [1, 0, 9, 10, 11, 3, 4, 5, 12, 13, 14, 6, 7, 8, 15, 16, 17, 18] limbs = [[18, 17, 1], [16, 15, 1], [5, 4, 3], [8, 7, 6], [11, 10, 9], [14, 13, 12]] def get_root_relative_poses(inference_results): features, heatmap, paf_map = inference_results upsample_ratio = 4 found_poses = extract_poses(heatmap[0:-1], paf_map, upsample_ratio)[0] # scale coordinates to features space found_poses[:, 0:-1:3] /= upsample_ratio found_poses[:, 1:-1:3] /= upsample_ratio poses_2d = [] num_kpt_panoptic = 19 num_kpt = 18 for pose_id in range(found_poses.shape[0]): if found_poses[pose_id, 5] == -1: # skip pose if is not found neck continue pose_2d = np.ones(num_kpt_panoptic * 3 + 1, dtype=np.float32) * -1 # +1 for pose confidence for kpt_id in range(num_kpt): if found_poses[pose_id, kpt_id * 3 + 2] != -1: x_2d, y_2d = found_poses[pose_id, kpt_id * 3 : kpt_id * 3 + 2] conf = found_poses[pose_id, kpt_id * 3 + 2] pose_2d[map_id_to_panoptic[kpt_id] * 3] = x_2d # just repacking pose_2d[map_id_to_panoptic[kpt_id] * 3 + 1] = y_2d pose_2d[map_id_to_panoptic[kpt_id] * 3 + 2] = conf pose_2d[-1] = found_poses[pose_id, -1] poses_2d.append(pose_2d) keypoint_treshold = 0.1 poses_3d = np.ones((len(poses_2d), num_kpt_panoptic * 4), dtype=np.float32) * -1 for pose_id in range(len(poses_3d)): if poses_2d[pose_id][2] > keypoint_treshold: neck_2d = poses_2d[pose_id][:2].astype(int) # read all pose coordinates at neck location for kpt_id in range(num_kpt_panoptic): map_3d = features[kpt_id * 3 : (kpt_id + 1) * 3] poses_3d[pose_id][kpt_id * 4] = map_3d[0, neck_2d[1], neck_2d[0]] * AVG_PERSON_HEIGHT poses_3d[pose_id][kpt_id * 4 + 1] = map_3d[1, neck_2d[1], neck_2d[0]] * AVG_PERSON_HEIGHT poses_3d[pose_id][kpt_id * 4 + 2] = map_3d[2, neck_2d[1], neck_2d[0]] * AVG_PERSON_HEIGHT poses_3d[pose_id][kpt_id * 4 + 3] = poses_2d[pose_id][kpt_id * 3 + 2] # refine keypoints coordinates at corresponding limbs locations for limb in limbs: for kpt_id_from in limb: if poses_2d[pose_id][kpt_id_from * 3 + 2] > keypoint_treshold: for kpt_id_where in limb: kpt_from_2d = poses_2d[pose_id][kpt_id_from * 3 : kpt_id_from * 3 + 2].astype(int) map_3d = features[kpt_id_where * 3 : (kpt_id_where + 1) * 3] poses_3d[pose_id][kpt_id_where * 4] = map_3d[0, kpt_from_2d[1], kpt_from_2d[0]] * AVG_PERSON_HEIGHT poses_3d[pose_id][kpt_id_where * 4 + 1] = map_3d[1, kpt_from_2d[1], kpt_from_2d[0]] * AVG_PERSON_HEIGHT poses_3d[pose_id][kpt_id_where * 4 + 2] = map_3d[2, kpt_from_2d[1], kpt_from_2d[0]] * AVG_PERSON_HEIGHT break return poses_3d, np.array(poses_2d), features.shape previous_poses_2d = [] def parse_poses(inference_results, input_scale, stride, fx, is_video=False): global previous_poses_2d poses_3d, poses_2d, features_shape = get_root_relative_poses(inference_results) poses_2d_scaled = [] for pose_2d in poses_2d: num_kpt = (pose_2d.shape[0] - 1) // 3 pose_2d_scaled = np.ones(pose_2d.shape[0], dtype=np.float32) * -1 # +1 for pose confidence for kpt_id in range(num_kpt): if pose_2d[kpt_id * 3 + 2] != -1: pose_2d_scaled[kpt_id * 3] = int(pose_2d[kpt_id * 3] * stride / input_scale) pose_2d_scaled[kpt_id * 3 + 1] = int(pose_2d[kpt_id * 3 + 1] * stride / input_scale) pose_2d_scaled[kpt_id * 3 + 2] = pose_2d[kpt_id * 3 + 2] pose_2d_scaled[-1] = pose_2d[-1] poses_2d_scaled.append(pose_2d_scaled) if is_video: # track poses ids current_poses_2d = [] for pose_id in range(len(poses_2d_scaled)): pose_keypoints = np.ones((Pose.num_kpts, 2), dtype=np.int32) * -1 for kpt_id in range(Pose.num_kpts): if poses_2d_scaled[pose_id][kpt_id * 3 + 2] != -1.0: # keypoint is found pose_keypoints[kpt_id, 0] = int(poses_2d_scaled[pose_id][kpt_id * 3 + 0]) pose_keypoints[kpt_id, 1] = int(poses_2d_scaled[pose_id][kpt_id * 3 + 1]) pose = Pose(pose_keypoints, poses_2d_scaled[pose_id][-1]) current_poses_2d.append(pose) propagate_ids(previous_poses_2d, current_poses_2d) previous_poses_2d = current_poses_2d translated_poses_3d = [] # translate poses for pose_id in range(len(poses_3d)): pose_3d = poses_3d[pose_id].reshape((-1, 4)).transpose() pose_2d = poses_2d[pose_id][:-1].reshape((-1, 3)).transpose() num_valid = np.count_nonzero(pose_2d[2] != -1) pose_3d_valid = np.zeros((3, num_valid), dtype=np.float32) pose_2d_valid = np.zeros((2, num_valid), dtype=np.float32) valid_id = 0 for kpt_id in range(pose_3d.shape[1]): if pose_2d[2, kpt_id] == -1: continue pose_3d_valid[:, valid_id] = pose_3d[0:3, kpt_id] pose_2d_valid[:, valid_id] = pose_2d[0:2, kpt_id] valid_id += 1 pose_2d_valid[0] = pose_2d_valid[0] - features_shape[2] / 2 pose_2d_valid[1] = pose_2d_valid[1] - features_shape[1] / 2 mean_3d = np.expand_dims(pose_3d_valid.mean(axis=1), axis=1) mean_2d = np.expand_dims(pose_2d_valid.mean(axis=1), axis=1) numerator = np.trace( np.dot( (pose_3d_valid[:2, :] - mean_3d[:2, :]).transpose(), pose_3d_valid[:2, :] - mean_3d[:2, :], ) ).sum() numerator = np.sqrt(numerator) denominator = np.sqrt( np.trace( np.dot( (pose_2d_valid[:2, :] - mean_2d[:2, :]).transpose(), pose_2d_valid[:2, :] - mean_2d[:2, :], ) ).sum() ) mean_2d = np.array([mean_2d[0, 0], mean_2d[1, 0], fx * input_scale / stride]) mean_3d = np.array([mean_3d[0, 0], mean_3d[1, 0], 0]) translation = numerator / denominator * mean_2d - mean_3d if is_video: translation = current_poses_2d[pose_id].filter(translation) for kpt_id in range(19): pose_3d[0, kpt_id] = pose_3d[0, kpt_id] + translation[0] pose_3d[1, kpt_id] = pose_3d[1, kpt_id] + translation[1] pose_3d[2, kpt_id] = pose_3d[2, kpt_id] + translation[2] translated_poses_3d.append(pose_3d.transpose().reshape(-1)) return np.array(translated_poses_3d), np.array(poses_2d_scaled) def get_alpha(rate=30, cutoff=1): tau = 1 / (2 * math.pi * cutoff) te = 1 / rate return 1 / (1 + tau / te) class LowPassFilter: def __init__(self): self.x_previous = None def __call__(self, x, alpha=0.5): if self.x_previous is None: self.x_previous = x return x x_filtered = alpha * x + (1 - alpha) * self.x_previous self.x_previous = x_filtered return x_filtered class OneEuroFilter: def __init__(self, freq=15, mincutoff=1, beta=1, dcutoff=1): self.freq = freq self.mincutoff = mincutoff self.beta = beta self.dcutoff = dcutoff self.filter_x = LowPassFilter() self.filter_dx = LowPassFilter() self.x_previous = None self.dx = None def __call__(self, x): if self.dx is None: self.dx = 0 else: self.dx = (x - self.x_previous) * self.freq dx_smoothed = self.filter_dx(self.dx, get_alpha(self.freq, self.dcutoff)) cutoff = self.mincutoff + self.beta * abs(dx_smoothed) x_filtered = self.filter_x(x, get_alpha(self.freq, cutoff)) self.x_previous = x return x_filtered BODY_PARTS_KPT_IDS = [ [1, 2], [1, 5], [2, 3], [3, 4], [5, 6], [6, 7], [1, 8], [8, 9], [9, 10], [1, 11], [11, 12], [12, 13], [1, 0], [0, 14], [14, 16], [0, 15], [15, 17], [2, 16], [5, 17], ] BODY_PARTS_PAF_IDS = ( [12, 13], [20, 21], [14, 15], [16, 17], [22, 23], [24, 25], [0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11], [28, 29], [30, 31], [34, 35], [32, 33], [36, 37], [18, 19], [26, 27], ) def linspace2d(start, stop, n=10): points = 1 / (n - 1) * (stop - start) return points[:, None] * np.arange(n) + start[:, None] def extract_keypoints(heatmap, all_keypoints, total_keypoint_num): heatmap[heatmap < 0.1] = 0 heatmap_with_borders = np.pad(heatmap, [(2, 2), (2, 2)], mode="constant") heatmap_center = heatmap_with_borders[1 : heatmap_with_borders.shape[0] - 1, 1 : heatmap_with_borders.shape[1] - 1] heatmap_left = heatmap_with_borders[1 : heatmap_with_borders.shape[0] - 1, 2 : heatmap_with_borders.shape[1]] heatmap_right = heatmap_with_borders[1 : heatmap_with_borders.shape[0] - 1, 0 : heatmap_with_borders.shape[1] - 2] heatmap_up = heatmap_with_borders[2 : heatmap_with_borders.shape[0], 1 : heatmap_with_borders.shape[1] - 1] heatmap_down = heatmap_with_borders[0 : heatmap_with_borders.shape[0] - 2, 1 : heatmap_with_borders.shape[1] - 1] heatmap_peaks = (heatmap_center > heatmap_left) & (heatmap_center > heatmap_right) & (heatmap_center > heatmap_up) & (heatmap_center > heatmap_down) heatmap_peaks = heatmap_peaks[1 : heatmap_center.shape[0] - 1, 1 : heatmap_center.shape[1] - 1] keypoints = list(zip(np.nonzero(heatmap_peaks)[1], np.nonzero(heatmap_peaks)[0])) # (w, h) keypoints = sorted(keypoints, key=itemgetter(0)) suppressed = np.zeros(len(keypoints), np.uint8) keypoints_with_score_and_id = [] keypoint_num = 0 for i in range(len(keypoints)): if suppressed[i]: continue for j in range(i + 1, len(keypoints)): if math.sqrt((keypoints[i][0] - keypoints[j][0]) ** 2 + (keypoints[i][1] - keypoints[j][1]) ** 2) < 6: suppressed[j] = 1 keypoint_with_score_and_id = ( keypoints[i][0], keypoints[i][1], heatmap[keypoints[i][1], keypoints[i][0]], total_keypoint_num + keypoint_num, ) keypoints_with_score_and_id.append(keypoint_with_score_and_id) keypoint_num += 1 all_keypoints.append(keypoints_with_score_and_id) return keypoint_num def group_keypoints(all_keypoints_by_type, pafs, pose_entry_size=20, min_paf_score=0.05): pose_entries = [] all_keypoints = np.array([item for sublist in all_keypoints_by_type for item in sublist]) for part_id in range(len(BODY_PARTS_PAF_IDS)): part_pafs = pafs[BODY_PARTS_PAF_IDS[part_id]] kpts_a = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][0]] kpts_b = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][1]] num_kpts_a = len(kpts_a) num_kpts_b = len(kpts_b) kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0] kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1] if num_kpts_a == 0 and num_kpts_b == 0: # no keypoints for such body part continue elif num_kpts_a == 0: # body part has just 'b' keypoints for i in range(num_kpts_b): num = 0 for j in range(len(pose_entries)): # check if already in some pose, was added by another body part if pose_entries[j][kpt_b_id] == kpts_b[i][3]: num += 1 continue if num == 0: pose_entry = np.ones(pose_entry_size) * -1 pose_entry[kpt_b_id] = kpts_b[i][3] # keypoint idx pose_entry[-1] = 1 # num keypoints in pose pose_entry[-2] = kpts_b[i][2] # pose score pose_entries.append(pose_entry) continue elif num_kpts_b == 0: # body part has just 'a' keypoints for i in range(num_kpts_a): num = 0 for j in range(len(pose_entries)): if pose_entries[j][kpt_a_id] == kpts_a[i][3]: num += 1 continue if num == 0: pose_entry = np.ones(pose_entry_size) * -1 pose_entry[kpt_a_id] = kpts_a[i][3] pose_entry[-1] = 1 pose_entry[-2] = kpts_a[i][2] pose_entries.append(pose_entry) continue connections = [] for i in range(num_kpts_a): kpt_a = np.array(kpts_a[i][0:2]) for j in range(num_kpts_b): kpt_b = np.array(kpts_b[j][0:2]) mid_point = [(), ()] mid_point[0] = ( int(round((kpt_a[0] + kpt_b[0]) * 0.5)), int(round((kpt_a[1] + kpt_b[1]) * 0.5)), ) mid_point[1] = mid_point[0] vec = [kpt_b[0] - kpt_a[0], kpt_b[1] - kpt_a[1]] vec_norm = math.sqrt(vec[0] ** 2 + vec[1] ** 2) if vec_norm == 0: continue vec[0] /= vec_norm vec[1] /= vec_norm cur_point_score = vec[0] * part_pafs[0, mid_point[0][1], mid_point[0][0]] + vec[1] * part_pafs[1, mid_point[1][1], mid_point[1][0]] height_n = pafs.shape[1] // 2 success_ratio = 0 point_num = 10 # number of points to integration over paf ratio = 0 if cur_point_score > -100: passed_point_score = 0 passed_point_num = 0 x, y = linspace2d(kpt_a, kpt_b) for point_idx in range(point_num): px = int(x[point_idx]) py = int(y[point_idx]) paf = part_pafs[:, py, px] cur_point_score = vec[0] * paf[0] + vec[1] * paf[1] if cur_point_score > min_paf_score: passed_point_score += cur_point_score passed_point_num += 1 success_ratio = passed_point_num / point_num if passed_point_num > 0: ratio = passed_point_score / passed_point_num ratio += min(height_n / vec_norm - 1, 0) if ratio > 0 and success_ratio > 0.8: score_all = ratio + kpts_a[i][2] + kpts_b[j][2] connections.append([i, j, ratio, score_all]) if len(connections) > 0: connections = sorted(connections, key=itemgetter(2), reverse=True) num_connections = min(num_kpts_a, num_kpts_b) has_kpt_a = np.zeros(num_kpts_a, dtype=np.int32) has_kpt_b = np.zeros(num_kpts_b, dtype=np.int32) filtered_connections = [] for row in range(len(connections)): if len(filtered_connections) == num_connections: break i, j, cur_point_score = connections[row][0:3] if not has_kpt_a[i] and not has_kpt_b[j]: filtered_connections.append([kpts_a[i][3], kpts_b[j][3], cur_point_score]) has_kpt_a[i] = 1 has_kpt_b[j] = 1 connections = filtered_connections if len(connections) == 0: continue if part_id == 0: pose_entries = [np.ones(pose_entry_size) * -1 for _ in range(len(connections))] for i in range(len(connections)): pose_entries[i][BODY_PARTS_KPT_IDS[0][0]] = connections[i][0] pose_entries[i][BODY_PARTS_KPT_IDS[0][1]] = connections[i][1] pose_entries[i][-1] = 2 pose_entries[i][-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2] elif part_id == 17 or part_id == 18: kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0] kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1] for i in range(len(connections)): for j in range(len(pose_entries)): if pose_entries[j][kpt_a_id] == connections[i][0] and pose_entries[j][kpt_b_id] == -1: pose_entries[j][kpt_b_id] = connections[i][1] elif pose_entries[j][kpt_b_id] == connections[i][1] and pose_entries[j][kpt_a_id] == -1: pose_entries[j][kpt_a_id] = connections[i][0] continue else: kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0] kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1] for i in range(len(connections)): num = 0 for j in range(len(pose_entries)): if pose_entries[j][kpt_a_id] == connections[i][0]: pose_entries[j][kpt_b_id] = connections[i][1] num += 1 pose_entries[j][-1] += 1 pose_entries[j][-2] += all_keypoints[connections[i][1], 2] + connections[i][2] if num == 0: pose_entry = np.ones(pose_entry_size) * -1 pose_entry[kpt_a_id] = connections[i][0] pose_entry[kpt_b_id] = connections[i][1] pose_entry[-1] = 2 pose_entry[-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2] pose_entries.append(pose_entry) filtered_entries = [] for i in range(len(pose_entries)): if pose_entries[i][-1] < 3 or (pose_entries[i][-2] / pose_entries[i][-1] < 0.2): continue filtered_entries.append(pose_entries[i]) pose_entries = np.asarray(filtered_entries) return pose_entries, all_keypoints def extract_poses(heatmaps, pafs, upsample_ratio): heatmaps = np.transpose(heatmaps, (1, 2, 0)) pafs = np.transpose(pafs, (1, 2, 0)) heatmaps = cv2.resize(heatmaps, dsize=None, fx=upsample_ratio, fy=upsample_ratio) pafs = cv2.resize(pafs, dsize=None, fx=upsample_ratio, fy=upsample_ratio) heatmaps = np.transpose(heatmaps, (2, 0, 1)) pafs = np.transpose(pafs, (2, 0, 1)) num_keypoints = heatmaps.shape[0] total_keypoints_num = 0 all_keypoints_by_type = [] for kpt_idx in range(num_keypoints): total_keypoints_num += extract_keypoints(heatmaps[kpt_idx], all_keypoints_by_type, total_keypoints_num) pose_entries, all_keypoints = group_keypoints(all_keypoints_by_type, pafs) found_poses = [] for pose_entry in pose_entries: if len(pose_entry) == 0: continue pose_keypoints = np.ones((num_keypoints * 3 + 1), dtype=np.float32) * -1 for kpt_id in range(num_keypoints): if pose_entry[kpt_id] != -1.0: pose_keypoints[kpt_id * 3 + 0] = all_keypoints[int(pose_entry[kpt_id]), 0] pose_keypoints[kpt_id * 3 + 1] = all_keypoints[int(pose_entry[kpt_id]), 1] pose_keypoints[kpt_id * 3 + 2] = all_keypoints[int(pose_entry[kpt_id]), 2] pose_keypoints[-1] = pose_entry[18] found_poses.append(pose_keypoints) if not found_poses: return np.array(found_poses, dtype=np.float32).reshape((0, 0)), None return np.array(found_poses, dtype=np.float32), None