import os, zipfile, rarfile, shutil, subprocess, shlex, sys # noqa from .logging_setup import logger from urllib.parse import urlparse from IPython.utils import capture import re import soundfile as sf import numpy as np VIDEO_EXTENSIONS = [ ".mp4", ".avi", ".mov", ".mkv", ".wmv", ".flv", ".webm", ".m4v", ".mpeg", ".mpg", ".3gp" ] AUDIO_EXTENSIONS = [ ".mp3", ".wav", ".aiff", ".aif", ".flac", ".aac", ".ogg", ".wma", ".m4a", ".alac", ".pcm", ".opus", ".ape", ".amr", ".ac3", ".vox", ".caf" ] SUBTITLE_EXTENSIONS = [ ".srt", ".vtt", ".ass" ] def run_command(command): logger.debug(command) if isinstance(command, str): command = shlex.split(command) sub_params = { "stdout": subprocess.PIPE, "stderr": subprocess.PIPE, "creationflags": subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0, } process_command = subprocess.Popen(command, **sub_params) output, errors = process_command.communicate() if ( process_command.returncode != 0 ): # or not os.path.exists(mono_path) or os.path.getsize(mono_path) == 0: logger.error("Error comnand") raise Exception(errors.decode()) def write_chunked( file, data, samplerate, subtype=None, endian=None, format=None, closefd=True, chunk_size=0x1000 ): data = np.asarray(data) if data.ndim == 1: channels = 1 else: channels = data.shape[1] with sf.SoundFile( file, 'w', samplerate, channels, subtype, endian, format, closefd ) as f: num_chunks = (len(data) + chunk_size - 1) // chunk_size for chunk in np.array_split(data, num_chunks, axis=0): f.write(chunk) def print_tree_directory(root_dir, indent=""): if not os.path.exists(root_dir): logger.error(f"{indent} Invalid directory or file: {root_dir}") return items = os.listdir(root_dir) for index, item in enumerate(sorted(items)): item_path = os.path.join(root_dir, item) is_last_item = index == len(items) - 1 if os.path.isfile(item_path) and item_path.endswith(".zip"): with zipfile.ZipFile(item_path, "r") as zip_file: print( f"{indent}{'└──' if is_last_item else '├──'} {item} (zip file)" ) zip_contents = zip_file.namelist() for zip_item in sorted(zip_contents): print( f"{indent}{' ' if is_last_item else '│ '}{zip_item}" ) else: print(f"{indent}{'└──' if is_last_item else '├──'} {item}") if os.path.isdir(item_path): new_indent = indent + (" " if is_last_item else "│ ") print_tree_directory(item_path, new_indent) def upload_model_list(): weight_root = "weights" models = [] for name in os.listdir(weight_root): if name.endswith(".pth"): models.append("weights/" + name) if models: logger.debug(models) index_root = "logs" index_paths = [None] for name in os.listdir(index_root): if name.endswith(".index"): index_paths.append("logs/" + name) if index_paths: logger.debug(index_paths) return models, index_paths def manual_download(url, dst): if "drive.google" in url: logger.info("Drive url") if "folders" in url: logger.info("folder") os.system(f'gdown --folder "{url}" -O {dst} --fuzzy -c') else: logger.info("single") os.system(f'gdown "{url}" -O {dst} --fuzzy -c') elif "huggingface" in url: logger.info("HuggingFace url") if "/blob/" in url or "/resolve/" in url: if "/blob/" in url: url = url.replace("/blob/", "/resolve/") download_manager(url=url, path=dst, overwrite=True, progress=True) else: os.system(f"git clone {url} {dst+'repo/'}") elif "http" in url: logger.info("URL") download_manager(url=url, path=dst, overwrite=True, progress=True) elif os.path.exists(url): logger.info("Path") copy_files(url, dst) else: logger.error(f"No valid URL: {url}") def download_list(text_downloads): try: urls = [elem.strip() for elem in text_downloads.split(",")] except Exception as error: raise ValueError(f"No valid URL. {str(error)}") create_directories(["downloads", "logs", "weights"]) path_download = "downloads/" for url in urls: manual_download(url, path_download) # Tree print("####################################") print_tree_directory("downloads", indent="") print("####################################") # Place files select_zip_and_rar_files("downloads/") models, _ = upload_model_list() # hf space models files delete remove_directory_contents("downloads/repo") return f"Downloaded = {models}" def select_zip_and_rar_files(directory_path="downloads/"): # filter zip_files = [] rar_files = [] for file_name in os.listdir(directory_path): if file_name.endswith(".zip"): zip_files.append(file_name) elif file_name.endswith(".rar"): rar_files.append(file_name) # extract for file_name in zip_files: file_path = os.path.join(directory_path, file_name) with zipfile.ZipFile(file_path, "r") as zip_ref: zip_ref.extractall(directory_path) for file_name in rar_files: file_path = os.path.join(directory_path, file_name) with rarfile.RarFile(file_path, "r") as rar_ref: rar_ref.extractall(directory_path) # set in path def move_files_with_extension(src_dir, extension, destination_dir): for root, _, files in os.walk(src_dir): for file_name in files: if file_name.endswith(extension): source_file = os.path.join(root, file_name) destination = os.path.join(destination_dir, file_name) shutil.move(source_file, destination) move_files_with_extension(directory_path, ".index", "logs/") move_files_with_extension(directory_path, ".pth", "weights/") return "Download complete" def is_file_with_extensions(string_path, extensions): return any(string_path.lower().endswith(ext) for ext in extensions) def is_video_file(string_path): return is_file_with_extensions(string_path, VIDEO_EXTENSIONS) def is_audio_file(string_path): return is_file_with_extensions(string_path, AUDIO_EXTENSIONS) def is_subtitle_file(string_path): return is_file_with_extensions(string_path, SUBTITLE_EXTENSIONS) def get_directory_files(directory): audio_files = [] video_files = [] sub_files = [] for item in os.listdir(directory): item_path = os.path.join(directory, item) if os.path.isfile(item_path): if is_audio_file(item_path): audio_files.append(item_path) elif is_video_file(item_path): video_files.append(item_path) elif is_subtitle_file(item_path): sub_files.append(item_path) logger.info( f"Files in path ({directory}): " f"{str(audio_files + video_files + sub_files)}" ) return audio_files, video_files, sub_files def get_valid_files(paths): valid_paths = [] for path in paths: if os.path.isdir(path): audio_files, video_files, sub_files = get_directory_files(path) valid_paths.extend(audio_files) valid_paths.extend(video_files) valid_paths.extend(sub_files) else: valid_paths.append(path) return valid_paths def extract_video_links(link): params_dlp = {"quiet": False, "no_warnings": True, "noplaylist": False} try: from yt_dlp import YoutubeDL with capture.capture_output() as cap: with YoutubeDL(params_dlp) as ydl: info_dict = ydl.extract_info( # noqa link, download=False, process=True ) urls = re.findall(r'\[youtube\] Extracting URL: (.*?)\n', cap.stdout) logger.info(f"List of videos in ({link}): {str(urls)}") del cap except Exception as error: logger.error(f"{link} >> {str(error)}") urls = [link] return urls def get_link_list(urls): valid_links = [] for url_video in urls: if "youtube.com" in url_video and "/watch?v=" not in url_video: url_links = extract_video_links(url_video) valid_links.extend(url_links) else: valid_links.append(url_video) return valid_links # ===================================== # Download Manager # ===================================== def load_file_from_url( url: str, model_dir: str, file_name: str | None = None, overwrite: bool = False, progress: bool = True, ) -> str: """Download a file from `url` into `model_dir`, using the file present if possible. Returns the path to the downloaded file. """ os.makedirs(model_dir, exist_ok=True) if not file_name: parts = urlparse(url) file_name = os.path.basename(parts.path) cached_file = os.path.abspath(os.path.join(model_dir, file_name)) # Overwrite if os.path.exists(cached_file): if overwrite or os.path.getsize(cached_file) == 0: remove_files(cached_file) # Download if not os.path.exists(cached_file): logger.info(f'Downloading: "{url}" to {cached_file}\n') from torch.hub import download_url_to_file download_url_to_file(url, cached_file, progress=progress) else: logger.debug(cached_file) return cached_file def friendly_name(file: str): if file.startswith("http"): file = urlparse(file).path file = os.path.basename(file) model_name, extension = os.path.splitext(file) return model_name, extension def download_manager( url: str, path: str, extension: str = "", overwrite: bool = False, progress: bool = True, ): url = url.strip() name, ext = friendly_name(url) name += ext if not extension else f".{extension}" if url.startswith("http"): filename = load_file_from_url( url=url, model_dir=path, file_name=name, overwrite=overwrite, progress=progress, ) else: filename = path return filename # ===================================== # File management # ===================================== # only remove files def remove_files(file_list): if isinstance(file_list, str): file_list = [file_list] for file in file_list: if os.path.exists(file): os.remove(file) def remove_directory_contents(directory_path): """ Removes all files and subdirectories within a directory. Parameters: directory_path (str): Path to the directory whose contents need to be removed. """ if os.path.exists(directory_path): for filename in os.listdir(directory_path): file_path = os.path.join(directory_path, filename) try: if os.path.isfile(file_path): os.remove(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: logger.error(f"Failed to delete {file_path}. Reason: {e}") logger.info(f"Content in '{directory_path}' removed.") else: logger.error(f"Directory '{directory_path}' does not exist.") # Create directory if not exists def create_directories(directory_path): if isinstance(directory_path, str): directory_path = [directory_path] for one_dir_path in directory_path: if not os.path.exists(one_dir_path): os.makedirs(one_dir_path) logger.debug(f"Directory '{one_dir_path}' created.") def move_files(source_dir, destination_dir, extension=""): """ Moves file(s) from the source path to the destination path. Parameters: source_dir (str): Path to the source directory. destination_dir (str): Path to the destination directory. extension (str): Only move files with this extension. """ create_directories(destination_dir) for filename in os.listdir(source_dir): source_path = os.path.join(source_dir, filename) destination_path = os.path.join(destination_dir, filename) if extension and not filename.endswith(extension): continue os.replace(source_path, destination_path) def copy_files(source_path, destination_path): """ Copies a file or multiple files from a source path to a destination path. Parameters: source_path (str or list): Path or list of paths to the source file(s) or directory. destination_path (str): Path to the destination directory. """ create_directories(destination_path) if isinstance(source_path, str): source_path = [source_path] if os.path.isdir(source_path[0]): # Copy all files from the source directory to the destination directory base_path = source_path[0] source_path = os.listdir(source_path[0]) source_path = [ os.path.join(base_path, file_name) for file_name in source_path ] for one_source_path in source_path: if os.path.exists(one_source_path): shutil.copy2(one_source_path, destination_path) logger.debug( f"File '{one_source_path}' copied to '{destination_path}'." ) else: logger.error(f"File '{one_source_path}' does not exist.") def rename_file(current_name, new_name): file_directory = os.path.dirname(current_name) if os.path.exists(current_name): dir_new_name_file = os.path.join(file_directory, new_name) os.rename(current_name, dir_new_name_file) logger.debug(f"File '{current_name}' renamed to '{new_name}'.") return dir_new_name_file else: logger.error(f"File '{current_name}' does not exist.") return None