Spaces:
Paused
Paused
import gradio as gr | |
import os | |
import shutil | |
import yaml | |
import tempfile | |
import cv2 | |
import huggingface_hub | |
import subprocess | |
import threading | |
import torch | |
from subprocess import getoutput | |
import torch | |
# νκ²½ λ³μ λμ μ½λ λ΄μμ μ§μ μ€μ | |
is_shared_ui = False # λλ True, νμμ λ°λΌ μ€μ | |
# is_shared_uiμ κ°μ λ°λΌ available_property μ€μ | |
available_property = not is_shared_ui | |
# μ΄μ is_shared_uiμ available_property λ³μλ μ½λ λ΄μμ μ§μ κ΄λ¦¬λ©λλ€. | |
is_gpu_associated = torch.cuda.is_available() | |
if is_gpu_associated: | |
gpu_info = getoutput('nvidia-smi') | |
if("A10G" in gpu_info): | |
which_gpu = "A10G" | |
elif("T4" in gpu_info): | |
which_gpu = "T4" | |
else: | |
which_gpu = "CPU" | |
def stream_output(pipe): | |
for line in iter(pipe.readline, ''): | |
print(line, end='') | |
pipe.close() | |
HF_TKN = os.environ.get("GATED_HF_TOKEN") | |
huggingface_hub.login(token=HF_TKN) | |
huggingface_hub.hf_hub_download( | |
repo_id='yzd-v/DWPose', | |
filename='yolox_l.onnx', | |
local_dir='./models/DWPose' | |
) | |
huggingface_hub.hf_hub_download( | |
repo_id='yzd-v/DWPose', | |
filename='dw-ll_ucoco_384.onnx', | |
local_dir='./models/DWPose' | |
) | |
huggingface_hub.hf_hub_download( | |
repo_id='ixaac/MimicMotion', | |
filename='MimicMotion_1.pth', | |
local_dir='./models' | |
) | |
def print_directory_contents(path): | |
for root, dirs, files in os.walk(path): | |
level = root.replace(path, '').count(os.sep) | |
indent = ' ' * 4 * (level) | |
print(f"{indent}{os.path.basename(root)}/") | |
subindent = ' ' * 4 * (level + 1) | |
for f in files: | |
print(f"{subindent}{f}") | |
def check_outputs_folder(folder_path): | |
# Check if the folder exists | |
if os.path.exists(folder_path) and os.path.isdir(folder_path): | |
# Delete all contents inside the folder | |
for filename in os.listdir(folder_path): | |
file_path = os.path.join(folder_path, filename) | |
try: | |
if os.path.isfile(file_path) or os.path.islink(file_path): | |
os.unlink(file_path) # Remove file or link | |
elif os.path.isdir(file_path): | |
shutil.rmtree(file_path) # Remove directory | |
except Exception as e: | |
print(f'Failed to delete {file_path}. Reason: {e}') | |
else: | |
print(f'The folder {folder_path} does not exist.') | |
def check_for_mp4_in_outputs(): | |
# Define the path to the outputs folder | |
outputs_folder = './outputs' | |
# Check if the outputs folder exists | |
if not os.path.exists(outputs_folder): | |
return None | |
# Check if there is a .mp4 file in the outputs folder | |
mp4_files = [f for f in os.listdir(outputs_folder) if f.endswith('.mp4')] | |
# Return the path to the mp4 file if it exists | |
if mp4_files: | |
return os.path.join(outputs_folder, mp4_files[0]) | |
else: | |
return None | |
def get_video_fps(video_path): | |
# Open the video file | |
video_capture = cv2.VideoCapture(video_path) | |
if not video_capture.isOpened(): | |
raise ValueError("Error opening video file") | |
# Get the FPS value | |
fps = video_capture.get(cv2.CAP_PROP_FPS) | |
# Release the video capture object | |
video_capture.release() | |
return fps | |
def load_examples(ref_image_in, ref_video_in): | |
return "./examples/mimicmotion_result1_example.mp4" | |
def infer(ref_image_in, ref_video_in, num_inference_steps, guidance_scale, output_frames_per_second, seed, checkpoint_version): | |
# check if 'outputs' dir exists and empty it if necessary | |
check_outputs_folder('./outputs') | |
# Create a temporary directory | |
with tempfile.TemporaryDirectory() as temp_dir: | |
print("Temporary directory created:", temp_dir) | |
# Define the values for the variables | |
ref_video_path = ref_video_in | |
ref_image_path = ref_image_in | |
num_frames = 16 | |
resolution = 576 | |
frames_overlap = 6 | |
num_inference_steps = num_inference_steps # 25 | |
noise_aug_strength = 0 | |
guidance_scale = guidance_scale # 2.0 | |
sample_stride = 2 | |
fps = output_frames_per_second # 16 | |
seed = seed # 42 | |
# Create the data structure | |
data = { | |
'base_model_path': 'stabilityai/stable-video-diffusion-img2vid-xt-1-1', | |
'ckpt_path': f'models/{checkpoint_version}', | |
'test_case': [ | |
{ | |
'ref_video_path': ref_video_path, | |
'ref_image_path': ref_image_path, | |
'num_frames': num_frames, | |
'resolution': resolution, | |
'frames_overlap': frames_overlap, | |
'num_inference_steps': num_inference_steps, | |
'noise_aug_strength': noise_aug_strength, | |
'guidance_scale': guidance_scale, | |
'sample_stride': sample_stride, | |
'fps': fps, | |
'seed': seed | |
} | |
] | |
} | |
# Define the file path | |
file_path = os.path.join(temp_dir, 'config.yaml') | |
# Write the data to a YAML file | |
with open(file_path, 'w') as file: | |
yaml.dump(data, file, default_flow_style=False) | |
print("YAML file 'config.yaml' created successfully in", file_path) | |
# Execute the inference command | |
command = ['python', 'inference.py', '--inference_config', file_path] | |
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1) | |
# Create threads to handle stdout and stderr | |
stdout_thread = threading.Thread(target=stream_output, args=(process.stdout,)) | |
stderr_thread = threading.Thread(target=stream_output, args=(process.stderr,)) | |
# Start the threads | |
stdout_thread.start() | |
stderr_thread.start() | |
# Wait for the process to complete and the threads to finish | |
process.wait() | |
stdout_thread.join() | |
stderr_thread.join() | |
print("Inference script finished with return code:", process.returncode) | |
# Print the outputs directory contents | |
print_directory_contents('./outputs') | |
# Call the function and print the result | |
mp4_file_path = check_for_mp4_in_outputs() | |
print(mp4_file_path) | |
return mp4_file_path | |
output_video = gr.Video(label="Output Video") | |
css = """ | |
div#warning-duplicate { | |
background-color: #ebf5ff; | |
padding: 0 16px 16px; | |
margin: 20px 0; | |
color: #030303!important; | |
} | |
div#warning-duplicate > .gr-prose > h2, div#warning-duplicate > .gr-prose > p { | |
color: #0f4592!important; | |
} | |
div#warning-duplicate strong { | |
color: #0f4592; | |
} | |
p.actions { | |
display: flex; | |
align-items: center; | |
margin: 20px 0; | |
} | |
div#warning-duplicate .actions a { | |
display: inline-block; | |
margin-right: 10px; | |
} | |
div#warning-setgpu { | |
background-color: #fff4eb; | |
padding: 0 16px 16px; | |
margin: 20px 0; | |
color: #030303!important; | |
} | |
div#warning-setgpu > .gr-prose > h2, div#warning-setgpu > .gr-prose > p { | |
color: #92220f!important; | |
} | |
div#warning-setgpu a, div#warning-setgpu b { | |
color: #91230f; | |
} | |
div#warning-setgpu p.actions > a { | |
display: inline-block; | |
background: #1f1f23; | |
border-radius: 40px; | |
padding: 6px 24px; | |
color: antiquewhite; | |
text-decoration: none; | |
font-weight: 600; | |
font-size: 1.2em; | |
} | |
div#warning-ready { | |
background-color: #ecfdf5; | |
padding: 0 16px 16px; | |
margin: 20px 0; | |
color: #030303!important; | |
} | |
div#warning-ready > .gr-prose > h2, div#warning-ready > .gr-prose > p { | |
color: #057857!important; | |
} | |
.custom-color { | |
color: #030303 !important; | |
} | |
""" | |
with gr.Blocks(css=css) as demo: | |
with gr.Column(): | |
gr.Markdown("# Mimic") | |
gr.Markdown("High-quality") | |
gr.HTML(""" | |
1 | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
if is_shared_ui: | |
top_description = gr.HTML(f''' | |
2 | |
''', elem_id="warning-duplicate") | |
else: | |
if(is_gpu_associated): | |
top_description = gr.HTML(f''' | |
2 | |
''', elem_id="warning-ready") | |
else: | |
top_description = gr.HTML(f''' | |
2 | |
''', elem_id="warning-setgpu") | |
with gr.Row(): | |
ref_image_in = gr.Image(label="Person Image Reference", type="filepath") | |
ref_video_in = gr.Video(label="Person Video Reference") | |
with gr.Accordion("Advanced Settings", open=False): | |
num_inference_steps = gr.Slider(label="num inference steps", minimum=12, maximum=50, value=25, step=1, interactive=available_property) | |
guidance_scale = gr.Slider(label="guidance scale", minimum=0.1, maximum=10, value=2, step=0.1, interactive=available_property) | |
with gr.Row(): | |
output_frames_per_second = gr.Slider(label="fps", minimum=1, maximum=60, value=16, step=1, interactive=available_property) | |
seed = gr.Number(label="Seed", value=42, interactive=available_property) | |
checkpoint_version = gr.Dropdown(label="Checkpoint Version", choices=["MimicMotion_1.pth", "MimicMotion_1-1.pth"], value="MimicMotion_1.pth", interactive=available_property, filterable=False) | |
submit_btn = gr.Button("Submit", interactive=available_property) | |
gr.Examples( | |
examples = [ | |
["./examples/demo1.jpg", "./examples/preview_1.mp4"] | |
], | |
fn = load_examples, | |
inputs = [ref_image_in, ref_video_in], | |
outputs = [output_video], | |
run_on_click = True, | |
cache_examples = False | |
) | |
output_video.render() | |
submit_btn.click( | |
fn = infer, | |
inputs = [ref_image_in, ref_video_in, num_inference_steps, guidance_scale, output_frames_per_second, seed, checkpoint_version], | |
outputs = [output_video] | |
) | |
demo.launch(show_api=False, show_error=False) |