Spaces:
Paused
Paused
import gradio as gr | |
import os | |
import shutil | |
import yaml | |
import tempfile | |
import cv2 | |
import huggingface_hub | |
import subprocess | |
import threading | |
def stream_output(pipe): | |
for line in iter(pipe.readline, ''): | |
print(line, end='') | |
pipe.close() | |
HF_TKN = os.environ.get("GATED_HF_TOKEN") | |
huggingface_hub.login(token=HF_TKN) | |
huggingface_hub.hf_hub_download( | |
repo_id='yzd-v/DWPose', | |
filename='yolox_l.onnx', | |
local_dir='./models/DWPose' | |
) | |
huggingface_hub.hf_hub_download( | |
repo_id='yzd-v/DWPose', | |
filename='dw-ll_ucoco_384.onnx', | |
local_dir='./models/DWPose' | |
) | |
huggingface_hub.hf_hub_download( | |
repo_id='ixaac/MimicMotion', | |
filename='MimicMotion_1.pth', | |
local_dir='./models' | |
) | |
def print_directory_contents(path): | |
for root, dirs, files in os.walk(path): | |
level = root.replace(path, '').count(os.sep) | |
indent = ' ' * 4 * (level) | |
print(f"{indent}{os.path.basename(root)}/") | |
subindent = ' ' * 4 * (level + 1) | |
for f in files: | |
print(f"{subindent}{f}") | |
def check_outputs_folder(folder_path): | |
# Check if the folder exists | |
if os.path.exists(folder_path) and os.path.isdir(folder_path): | |
# Delete all contents inside the folder | |
for filename in os.listdir(folder_path): | |
file_path = os.path.join(folder_path, filename) | |
try: | |
if os.path.isfile(file_path) or os.path.islink(file_path): | |
os.unlink(file_path) # Remove file or link | |
elif os.path.isdir(file_path): | |
shutil.rmtree(file_path) # Remove directory | |
except Exception as e: | |
print(f'Failed to delete {file_path}. Reason: {e}') | |
else: | |
print(f'The folder {folder_path} does not exist.') | |
def check_for_mp4_in_outputs(): | |
# Define the path to the outputs folder | |
outputs_folder = './outputs' | |
# Check if the outputs folder exists | |
if not os.path.exists(outputs_folder): | |
return None | |
# Check if there is a .mp4 file in the outputs folder | |
mp4_files = [f for f in os.listdir(outputs_folder) if f.endswith('.mp4')] | |
# Return the path to the mp4 file if it exists | |
if mp4_files: | |
return os.path.join(outputs_folder, mp4_files[0]) | |
else: | |
return None | |
def get_video_fps(video_path): | |
# Open the video file | |
video_capture = cv2.VideoCapture(video_path) | |
if not video_capture.isOpened(): | |
raise ValueError("Error opening video file") | |
# Get the FPS value | |
fps = video_capture.get(cv2.CAP_PROP_FPS) | |
# Release the video capture object | |
video_capture.release() | |
return fps | |
def infer(ref_image_in, ref_video_in): | |
# check if 'outputs' dir exists and empty it if necessary | |
check_outputs_folder('./outputs') | |
# Create a temporary directory | |
with tempfile.TemporaryDirectory() as temp_dir: | |
print("Temporary directory created:", temp_dir) | |
# Define the values for the variables | |
ref_video_path = ref_video_in | |
ref_image_path = ref_image_in | |
num_frames = 16 | |
resolution = 576 | |
frames_overlap = 6 | |
num_inference_steps = 25 | |
noise_aug_strength = 0 | |
guidance_scale = 2.0 | |
sample_stride = 2 | |
fps = get_video_fps(ref_video_in) | |
seed = 42 | |
# Create the data structure | |
data = { | |
'base_model_path': 'stabilityai/stable-video-diffusion-img2vid-xt-1-1', | |
'ckpt_path': 'models/MimicMotion_1.pth', | |
'test_case': [ | |
{ | |
'ref_video_path': ref_video_path, | |
'ref_image_path': ref_image_path, | |
'num_frames': num_frames, | |
'resolution': resolution, | |
'frames_overlap': frames_overlap, | |
'num_inference_steps': num_inference_steps, | |
'noise_aug_strength': noise_aug_strength, | |
'guidance_scale': guidance_scale, | |
'sample_stride': sample_stride, | |
'fps': fps, | |
'seed': seed | |
} | |
] | |
} | |
# Define the file path | |
file_path = os.path.join(temp_dir, 'config.yaml') | |
# Write the data to a YAML file | |
with open(file_path, 'w') as file: | |
yaml.dump(data, file, default_flow_style=False) | |
print("YAML file 'config.yaml' created successfully in", file_path) | |
# Execute the inference command | |
command = ['python', 'inference.py', '--inference_config', file_path] | |
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1) | |
# Create threads to handle stdout and stderr | |
stdout_thread = threading.Thread(target=stream_output, args=(process.stdout,)) | |
stderr_thread = threading.Thread(target=stream_output, args=(process.stderr,)) | |
# Start the threads | |
stdout_thread.start() | |
stderr_thread.start() | |
# Wait for the process to complete and the threads to finish | |
process.wait() | |
stdout_thread.join() | |
stderr_thread.join() | |
print("Inference script finished with return code:", process.returncode) | |
# Print the outputs directory contents | |
print_directory_contents('./outputs') | |
# Call the function and print the result | |
mp4_file_path = check_for_mp4_in_outputs() | |
print(mp4_file_path) | |
return mp4_file_path | |
with gr.Blocks() as demo: | |
with gr.Column(): | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Row(): | |
ref_image_in = gr.Image(type="filepath") | |
ref_video_in = gr.Video() | |
submit_btn = gr.Button("Submit") | |
output_video = gr.Video() | |
submit_btn.click( | |
fn = infer, | |
inputs = [ref_image_in, ref_video_in], | |
outputs = [output_video] | |
) | |
demo.launch() |