|
|
|
|
|
""" |
|
functions for processing video |
|
""" |
|
|
|
import os.path as osp |
|
import numpy as np |
|
import subprocess |
|
import imageio |
|
import cv2 |
|
|
|
|
|
from .helper import prefix |
|
from .rprint import rprint as print |
|
|
|
|
|
def exec_cmd(cmd): |
|
subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) |
|
|
|
|
|
def images2video(images, wfp, **kwargs): |
|
fps = kwargs.get('fps', 30) |
|
video_format = kwargs.get('format', 'mp4') |
|
codec = kwargs.get('codec', 'libx264') |
|
quality = kwargs.get('quality') |
|
pixelformat = kwargs.get('pixelformat', 'yuv420p') |
|
image_mode = kwargs.get('image_mode', 'rgb') |
|
macro_block_size = kwargs.get('macro_block_size', 2) |
|
ffmpeg_params = ['-crf', str(kwargs.get('crf', 18))] |
|
|
|
writer = imageio.get_writer( |
|
wfp, fps=fps, format=video_format, |
|
codec=codec, quality=quality, ffmpeg_params=ffmpeg_params, pixelformat=pixelformat, macro_block_size=macro_block_size |
|
) |
|
|
|
n = len(images) |
|
print('writing',n) |
|
for i in range(n): |
|
if image_mode.lower() == 'bgr': |
|
writer.append_data(images[i][..., ::-1]) |
|
else: |
|
writer.append_data(images[i]) |
|
|
|
writer.close() |
|
|
|
|
|
print(f'Dump to {wfp}\n') |
|
|
|
|
|
def video2gif(video_fp, fps=30, size=256): |
|
if osp.exists(video_fp): |
|
d = osp.split(video_fp)[0] |
|
fn = prefix(osp.basename(video_fp)) |
|
palette_wfp = osp.join(d, 'palette.png') |
|
gif_wfp = osp.join(d, f'{fn}.gif') |
|
|
|
cmd = f'ffmpeg -i {video_fp} -vf "fps={fps},scale={size}:-1:flags=lanczos,palettegen" {palette_wfp} -y' |
|
exec_cmd(cmd) |
|
|
|
cmd = f'ffmpeg -i {video_fp} -i {palette_wfp} -filter_complex "fps={fps},scale={size}:-1:flags=lanczos[x];[x][1:v]paletteuse" {gif_wfp} -y' |
|
exec_cmd(cmd) |
|
else: |
|
print(f'video_fp: {video_fp} not exists!') |
|
|
|
|
|
def merge_audio_video(video_fp, audio_fp, wfp): |
|
if osp.exists(video_fp) and osp.exists(audio_fp): |
|
cmd = f'ffmpeg -i {video_fp} -i {audio_fp} -c:v copy -c:a aac {wfp} -y' |
|
exec_cmd(cmd) |
|
print(f'merge {video_fp} and {audio_fp} to {wfp}') |
|
else: |
|
print(f'video_fp: {video_fp} or audio_fp: {audio_fp} not exists!') |
|
|
|
|
|
def blend(img: np.ndarray, mask: np.ndarray, background_color=(255, 255, 255)): |
|
mask_float = mask.astype(np.float32) / 255. |
|
background_color = np.array(background_color).reshape([1, 1, 3]) |
|
bg = np.ones_like(img) * background_color |
|
img = np.clip(mask_float * img + (1 - mask_float) * bg, 0, 255).astype(np.uint8) |
|
return img |
|
|
|
|
|
def concat_frames(I_p_lst, driving_rgb_lst, img_rgb): |
|
|
|
out_lst = [] |
|
print('Concatenating result...',len(I_p_lst)) |
|
for idx, _ in enumerate(I_p_lst): |
|
|
|
source_image_drived = I_p_lst[idx] |
|
image_drive = driving_rgb_lst[idx] |
|
|
|
|
|
h, w, _ = source_image_drived.shape |
|
image_drive_resized = cv2.resize(image_drive, (w, h)) |
|
img_rgb_resized = cv2.resize(img_rgb, (w, h)) |
|
|
|
|
|
frame = np.concatenate((image_drive_resized, img_rgb_resized, source_image_drived), axis=1) |
|
out_lst.append(frame) |
|
return out_lst |
|
|
|
|
|
class VideoWriter: |
|
def __init__(self, **kwargs): |
|
self.fps = kwargs.get('fps', 30) |
|
self.wfp = kwargs.get('wfp', 'video.mp4') |
|
self.video_format = kwargs.get('format', 'mp4') |
|
self.codec = kwargs.get('codec', 'libx264') |
|
self.quality = kwargs.get('quality') |
|
self.pixelformat = kwargs.get('pixelformat', 'yuv420p') |
|
self.image_mode = kwargs.get('image_mode', 'rgb') |
|
self.ffmpeg_params = kwargs.get('ffmpeg_params') |
|
|
|
self.writer = imageio.get_writer( |
|
self.wfp, fps=self.fps, format=self.video_format, |
|
codec=self.codec, quality=self.quality, |
|
ffmpeg_params=self.ffmpeg_params, pixelformat=self.pixelformat |
|
) |
|
|
|
def write(self, image): |
|
if self.image_mode.lower() == 'bgr': |
|
self.writer.append_data(image[..., ::-1]) |
|
else: |
|
self.writer.append_data(image) |
|
|
|
def close(self): |
|
if self.writer is not None: |
|
self.writer.close() |
|
|
|
|
|
def change_video_fps(input_file, output_file, fps=20, codec='libx264', crf=5): |
|
cmd = f"ffmpeg -i {input_file} -c:v {codec} -crf {crf} -r {fps} {output_file} -y" |
|
exec_cmd(cmd) |
|
|
|
|
|
def get_fps(filepath): |
|
import ffmpeg |
|
probe = ffmpeg.probe(filepath) |
|
video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) |
|
fps = eval(video_stream['avg_frame_rate']) |
|
return fps |
|
|