File size: 4,823 Bytes
f53b39e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
import argparse
import os
from pathlib import Path
import cv2
import numpy as np
import submitit
import tqdm
def get_args_parser():
parser = argparse.ArgumentParser(
description="[SA-V Preprocessing] Extracting JPEG frames",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
# ------------
# DATA
# ------------
data_parser = parser.add_argument_group(
title="SA-V dataset data root",
description="What data to load and how to process it.",
)
data_parser.add_argument(
"--sav-vid-dir",
type=str,
required=True,
help=("Where to find the SAV videos"),
)
data_parser.add_argument(
"--sav-frame-sample-rate",
type=int,
default=4,
help="Rate at which to sub-sample frames",
)
# ------------
# LAUNCH
# ------------
launch_parser = parser.add_argument_group(
title="Cluster launch settings",
description="Number of jobs and retry settings.",
)
launch_parser.add_argument(
"--n-jobs",
type=int,
required=True,
help="Shard the run over this many jobs.",
)
launch_parser.add_argument(
"--timeout", type=int, required=True, help="SLURM timeout parameter in minutes."
)
launch_parser.add_argument(
"--partition", type=str, required=True, help="Partition to launch on."
)
launch_parser.add_argument(
"--account", type=str, required=True, help="Partition to launch on."
)
launch_parser.add_argument("--qos", type=str, required=True, help="QOS.")
# ------------
# OUTPUT
# ------------
output_parser = parser.add_argument_group(
title="Setting for results output", description="Where and how to save results."
)
output_parser.add_argument(
"--output-dir",
type=str,
required=True,
help=("Where to dump the extracted jpeg frames"),
)
output_parser.add_argument(
"--slurm-output-root-dir",
type=str,
required=True,
help=("Where to save slurm outputs"),
)
return parser
def decode_video(video_path: str):
assert os.path.exists(video_path)
video = cv2.VideoCapture(video_path)
video_frames = []
while video.isOpened():
ret, frame = video.read()
if ret:
video_frames.append(frame)
else:
break
return video_frames
def extract_frames(video_path, sample_rate):
frames = decode_video(video_path)
return frames[::sample_rate]
def submitit_launch(video_paths, sample_rate, save_root):
for path in tqdm.tqdm(video_paths):
frames = extract_frames(path, sample_rate)
output_folder = os.path.join(save_root, Path(path).stem)
if not os.path.exists(output_folder):
os.makedirs(output_folder)
for fid, frame in enumerate(frames):
frame_path = os.path.join(output_folder, f"{fid*sample_rate:05d}.jpg")
cv2.imwrite(frame_path, frame)
print(f"Saved output to {save_root}")
if __name__ == "__main__":
parser = get_args_parser()
args = parser.parse_args()
sav_vid_dir = args.sav_vid_dir
save_root = args.output_dir
sample_rate = args.sav_frame_sample_rate
# List all SA-V videos
mp4_files = sorted([str(p) for p in Path(sav_vid_dir).glob("*/*.mp4")])
mp4_files = np.array(mp4_files)
chunked_mp4_files = [x.tolist() for x in np.array_split(mp4_files, args.n_jobs)]
print(f"Processing videos in: {sav_vid_dir}")
print(f"Processing {len(mp4_files)} files")
print(f"Beginning processing in {args.n_jobs} processes")
# Submitit params
jobs_dir = os.path.join(args.slurm_output_root_dir, "%j")
cpus_per_task = 4
executor = submitit.AutoExecutor(folder=jobs_dir)
executor.update_parameters(
timeout_min=args.timeout,
gpus_per_node=0,
tasks_per_node=1,
slurm_array_parallelism=args.n_jobs,
cpus_per_task=cpus_per_task,
slurm_partition=args.partition,
slurm_account=args.account,
slurm_qos=args.qos,
)
executor.update_parameters(slurm_srun_args=["-vv", "--cpu-bind", "none"])
# Launch
jobs = []
with executor.batch():
for _, mp4_chunk in tqdm.tqdm(enumerate(chunked_mp4_files)):
job = executor.submit(
submitit_launch,
video_paths=mp4_chunk,
sample_rate=sample_rate,
save_root=save_root,
)
jobs.append(job)
for j in jobs:
print(f"Slurm JobID: {j.job_id}")
print(f"Saving outputs to {save_root}")
print(f"Slurm outputs at {args.slurm_output_root_dir}")
|