Spaces:
Runtime error
Runtime error
import atexit | |
import bisect | |
import multiprocessing as mp | |
from collections import deque | |
import cv2 | |
import torch | |
import argparse | |
import glob | |
import multiprocessing as mp | |
import numpy as np | |
import os | |
import tempfile | |
import time | |
import warnings | |
import cv2 | |
import subprocess | |
import tqdm | |
import gradio as gr | |
TOTAL_FRAMES = 60 | |
subprocess.run(["pip", "install", "git+https://github.com/wjf5203/VNext.git"]) | |
subprocess.run(["git", "clone", "https://github.com/wjf5203/VNext"]) | |
from detectron2.data import MetadataCatalog | |
from detectron2.data.detection_utils import read_image | |
from detectron2.engine.defaults import DefaultPredictor | |
from detectron2.utils.video_visualizer import VideoVisualizer | |
from detectron2.utils.visualizer import ColorMode, Visualizer | |
from detectron2.config import get_cfg | |
from detectron2.utils.logger import setup_logger | |
def test_opencv_video_format(codec, file_ext): | |
with tempfile.TemporaryDirectory(prefix="video_format_test") as dir: | |
filename = os.path.join(dir, "test_file" + file_ext) | |
writer = cv2.VideoWriter( | |
filename=filename, | |
fourcc=cv2.VideoWriter_fourcc(*codec), | |
fps=float(30), | |
frameSize=(10, 10), | |
isColor=True, | |
) | |
[writer.write(np.zeros((10, 10, 3), np.uint8)) for _ in range(30)] | |
writer.release() | |
if os.path.isfile(filename): | |
return True | |
return False | |
def setup_cfg(cfg): | |
# load config from file and command-line arguments | |
cfg = get_cfg() | |
# To use demo for Panoptic-DeepLab, please uncomment the following two lines. | |
# from detectron2.projects.panoptic_deeplab import add_panoptic_deeplab_config # noqa | |
# add_panoptic_deeplab_config(cfg) | |
cfg.merge_from_file("VNext/configs/quick_schedules/mask_rcnn_R_50_FPN_inference_acc_test.yaml") | |
# Set score_threshold for builtin models | |
cfg.MODEL.RETINANET.SCORE_THRESH_TEST = 0.5 | |
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5 | |
cfg.MODEL.PANOPTIC_FPN.COMBINE.INSTANCES_CONFIDENCE_THRESH = 0.5 | |
cfg.freeze() | |
return cfg | |
predictor = DefaultPredictor(setup_cfg({})) | |
metadata = MetadataCatalog.get("__unused") | |
def run_on_video(video, total_frames): | |
video_visualizer = VideoVisualizer(metadata, ColorMode.IMAGE) | |
def _frame_from_video(video): | |
while video.isOpened(): | |
success, frame = video.read() | |
if success: | |
yield frame | |
else: | |
break | |
def process_predictions(frame, predictions): | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
if "panoptic_seg" in predictions: | |
panoptic_seg, segments_info = predictions["panoptic_seg"] | |
vis_frame = video_visualizer.draw_panoptic_seg_predictions( | |
frame, panoptic_seg.to("cpu"), segments_info | |
) | |
elif "instances" in predictions: | |
predictions = predictions["instances"].to("cpu") | |
vis_frame = video_visualizer.draw_instance_predictions(frame, predictions) | |
elif "sem_seg" in predictions: | |
vis_frame = video_visualizer.draw_sem_seg( | |
frame, predictions["sem_seg"].argmax(dim=0).to("cpu") | |
) | |
# Converts Matplotlib RGB format to OpenCV BGR format | |
vis_frame = cv2.cvtColor(vis_frame.get_image(), cv2.COLOR_RGB2BGR) | |
return vis_frame | |
frame_gen = _frame_from_video(video) | |
i = 0 | |
for frame in frame_gen: | |
i += 1 | |
if i == total_frames: | |
return | |
yield process_predictions(frame, predictor(frame)) | |
def inference(video): | |
video = cv2.VideoCapture(video) | |
width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
frames_per_second = video.get(cv2.CAP_PROP_FPS) | |
num_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
print(num_frames) | |
if num_frames>TOTAL_FRAMES: | |
num_frames=TOTAL_FRAMES | |
codec, file_ext = ( | |
("x264", ".mkv") if test_opencv_video_format("x264", ".mkv") else ("mp4v", ".mp4") | |
) | |
print(codec, file_ext) | |
output_fname = "result.mp4" | |
output_file = cv2.VideoWriter( | |
filename=output_fname, | |
fourcc=cv2.VideoWriter_fourcc(*codec), | |
fps=float(frames_per_second), | |
frameSize=(width, height), | |
isColor=True, | |
) | |
for vis_frame in tqdm.tqdm(run_on_video(video, num_frames), total=num_frames): | |
output_file.write(vis_frame) | |
video.release() | |
output_file.release() | |
out_file = tempfile.NamedTemporaryFile(suffix="out.mp4", delete=False) | |
subprocess.run(f"ffmpeg -y -loglevel quiet -stats -i {output_fname} -c:v libx264 {out_file.name}".split()) | |
return out_file.name | |
video_interface = gr.Interface( | |
fn=inference, | |
inputs=[ | |
gr.Video(type="file"), | |
], | |
outputs=gr.Video(type="file", format="mp4"), | |
examples=[ | |
["inps.mp4"], ["example_3.mp4"], | |
], | |
allow_flagging=False, | |
allow_screenshot=False, | |
title="VNext", | |
description="demo for <a href='https://github.com/wjf5203/VNext'>wjf5203/VNext</a>" | |
).launch(debug=True) | |