Spaces:
Running
Running
import os | |
import shutil | |
import gradio as gr | |
import zipfile | |
import tempfile | |
from infer.modules.train.preprocess import PreProcess | |
from typing import Tuple | |
from huggingface_hub import snapshot_download | |
def extract_audio_files(zip_file: str, target_dir: str) -> list[str]: | |
with zipfile.ZipFile(zip_file, "r") as zip_ref: | |
zip_ref.extractall(target_dir) | |
audio_files = [ | |
os.path.join(target_dir, f) | |
for f in os.listdir(target_dir) | |
if f.endswith((".wav", ".mp3", ".ogg")) | |
] | |
if not audio_files: | |
raise gr.Error("No audio files found at the top level of the zip file") | |
return audio_files | |
def create_new_expdir(zip_file: str) -> Tuple[str, str]: | |
temp_dir = tempfile.mkdtemp() | |
print(f"Using exp dir: {temp_dir}") | |
data_dir = os.path.join(temp_dir, "_data") | |
os.makedirs(data_dir) | |
audio_files = extract_audio_files(zip_file, data_dir) | |
pp = PreProcess(40000, temp_dir, 3.0, False) | |
pp.pipeline_mp_inp_dir(data_dir, 4) | |
pp.logfile.seek(0) | |
log = pp.logfile.read() | |
return temp_dir, f"Preprocessed {len(audio_files)} audio files.\n{log}" | |
def restore_expdir(zip: str) -> str: | |
exp_dir = tempfile.mkdtemp() | |
shutil.unpack_archive(zip, exp_dir) | |
return exp_dir | |
def restore_from_huggingface(repo: str, token: str) -> str: | |
exp_dir = os.path.join(tempfile.mkdtemp(), repo.lower()) | |
snapshot_download( | |
repo, local_dir=exp_dir, token=token if token.startswith("hf_") else None | |
) | |
return exp_dir | |
def set_dir(dir_val: str) -> str: | |
if not dir_val.startswith("/tmp/"): | |
dir_val = os.path.join("/tmp", dir_val) | |
if not os.path.isdir(dir_val): | |
raise gr.Error("Directory does not exist") | |
return dir_val | |
class SetupTab: | |
def __init__(self): | |
pass | |
def ui(self): | |
gr.Markdown("# Setup Experiment") | |
gr.Markdown( | |
"You can upload a zip file containing audio files to start a new experiment, or upload an experiment directory zip file to restore an existing experiment.\n" | |
"The suggested dataset size is > 5 minutes of audio." | |
) | |
with gr.Row(): | |
with gr.Column(): | |
self.zip_file = gr.File( | |
label="Upload a zip file containing audio files for training", | |
file_types=["zip"], | |
) | |
self.preprocess_log = gr.Textbox(label="Log", lines=5) | |
self.preprocess_btn = gr.Button( | |
value="Start New Experiment", variant="primary" | |
) | |
with gr.Row(): | |
self.restore_zip_file = gr.File( | |
label="Upload the experiment directory zip file", | |
file_types=["zip"], | |
) | |
self.restore_btn = gr.Button(value="Restore Experiment", variant="primary") | |
gr.Markdown("You can also restore from a Hugging Face repo.") | |
with gr.Row(): | |
self.hf_repo = gr.Textbox( | |
label="Restore from Hugging Face repo", | |
placeholder="username/repo", | |
) | |
self.hf_token = gr.Textbox( | |
label="Hugging Face token (optional)", | |
placeholder="hf_...", | |
) | |
self.restore_hf_btn = gr.Button(value="Restore from Hugging Face") | |
with gr.Row(): | |
self.dir_val = gr.Textbox( | |
label="Manually set the experiment directory (don't touch it unless you know what you are doing)", | |
placeholder="/tmp/...", | |
) | |
self.set_dir_btn = gr.Button(value="Set Directory") | |
def build(self, exp_dir: gr.Textbox): | |
self.preprocess_btn.click( | |
fn=create_new_expdir, | |
inputs=[self.zip_file], | |
outputs=[exp_dir, self.preprocess_log], | |
) | |
self.restore_btn.click( | |
fn=restore_expdir, | |
inputs=[self.restore_zip_file], | |
outputs=[exp_dir], | |
) | |
self.restore_hf_btn.click( | |
fn=restore_from_huggingface, | |
inputs=[self.hf_repo, self.hf_token], | |
outputs=[exp_dir], | |
) | |
self.set_dir_btn.click( | |
fn=set_dir, | |
inputs=[self.dir_val], | |
outputs=[exp_dir], | |
) | |