import os import re import sys from urllib.parse import parse_qs, unquote, urlencode, urlparse import requests import six import wget from bs4 import BeautifulSoup now_dir = os.getcwd() sys.path.append(now_dir) from rvc.lib.tools import gdown def find_folder_parent(search_dir, folder_name): for dirpath, dirnames, _ in os.walk(search_dir): if folder_name in dirnames: return os.path.abspath(dirpath) return None file_path = find_folder_parent(now_dir, "logs") zips_path = os.path.join(file_path, "zips") def download_from_url(url): os.makedirs(zips_path, exist_ok=True) if url != "": if "drive.google.com" in url: if "file/d/" in url: file_id = url.split("file/d/")[1].split("/")[0] elif "id=" in url: file_id = url.split("id=")[1].split("&")[0] else: return None if file_id: os.chdir(zips_path) try: gdown.download( f"https://drive.google.com/uc?id={file_id}", quiet=True, fuzzy=True, ) except Exception as error: error_message = str(f"An error occurred downloading the file: {error}") if "Too many users have viewed or downloaded this file recently" in error_message: os.chdir(now_dir) return "too much use" elif "Cannot retrieve the public link of the file." in error_message: os.chdir(now_dir) return "private link" else: print(error_message) os.chdir(now_dir) return None elif "disk.yandex.ru" in url: base_url = "https://cloud-api.yandex.net/v1/disk/public/resources/download?" public_key = url final_url = base_url + urlencode(dict(public_key=public_key)) response = requests.get(final_url) download_url = response.json()["href"] download_response = requests.get(download_url) if download_response.status_code == 200: filename = parse_qs(urlparse(unquote(download_url)).query).get("filename", [""])[0] if filename: os.chdir(zips_path) with open(filename, "wb") as f: f.write(download_response.content) else: print("Failed to get filename from URL.") return None elif "pixeldrain.com" in url: try: file_id = url.split("pixeldrain.com/u/")[1] os.chdir(zips_path) print(file_id) response = requests.get(f"https://pixeldrain.com/api/file/{file_id}") if response.status_code == 200: file_name = response.headers.get("Content-Disposition").split("filename=")[-1].strip('";') os.makedirs(zips_path, exist_ok=True) with open(os.path.join(zips_path, file_name), "wb") as newfile: newfile.write(response.content) os.chdir(file_path) return "downloaded" else: os.chdir(file_path) return None except Exception as error: print(f"An error occurred downloading the file: {error}") os.chdir(file_path) return None elif "cdn.discordapp.com" in url: file = requests.get(url) os.chdir(zips_path) if file.status_code == 200: name = url.split("/") with open(os.path.join(name[-1]), "wb") as newfile: newfile.write(file.content) else: return None elif "/blob/" in url or "/resolve/" in url: os.chdir(zips_path) if "/blob/" in url: url = url.replace("/blob/", "/resolve/") response = requests.get(url, stream=True) if response.status_code == 200: content_disposition = six.moves.urllib_parse.unquote(response.headers["Content-Disposition"]) m = re.search(r'filename="([^"]+)"', content_disposition) file_name = m.groups()[0] file_name = file_name.replace(os.path.sep, "_") total_size_in_bytes = int(response.headers.get("content-length", 0)) block_size = 1024 progress_bar_length = 50 progress = 0 with open(os.path.join(zips_path, file_name), "wb") as file: for data in response.iter_content(block_size): file.write(data) progress += len(data) progress_percent = int((progress / total_size_in_bytes) * 100) num_dots = int((progress / total_size_in_bytes) * progress_bar_length) progress_bar = "[" + "." * num_dots + " " * (progress_bar_length - num_dots) + "]" print( f"{progress_percent}% {progress_bar} {progress}/{total_size_in_bytes} ", end="\r", ) if progress_percent == 100: print("\n") else: os.chdir(now_dir) return None elif "/tree/main" in url: os.chdir(zips_path) response = requests.get(url) soup = BeautifulSoup(response.content, "html.parser") temp_url = "" for link in soup.find_all("a", href=True): if link["href"].endswith(".zip"): temp_url = link["href"] break if temp_url: url = temp_url url = url.replace("blob", "resolve") if "huggingface.co" not in url: url = "https://huggingface.co" + url wget.download(url) else: os.chdir(now_dir) return None elif "applio.org" in url: parts = url.split("/") id_with_query = parts[-1] id_parts = id_with_query.split("?") id_number = id_parts[0] url = "https://cjtfqzjfdimgpvpwhzlv.supabase.co/rest/v1/models" headers = { "apikey": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImNqdGZxempmZGltZ3B2cHdoemx2Iiwicm9sZSI6ImFub24iLCJpYXQiOjE2OTUxNjczODgsImV4cCI6MjAxMDc0MzM4OH0.7z5WMIbjR99c2Ooc0ma7B_FyGq10G8X-alkCYTkKR10" } params = {"id": f"eq.{id_number}"} response = requests.get(url, headers=headers, params=params) if response.status_code == 200: json_response = response.json() print(json_response) if json_response: link = json_response[0]["link"] verify = download_from_url(link) if verify == "downloaded": return "downloaded" else: return None else: return None else: try: os.chdir(zips_path) wget.download(url) except Exception as error: os.chdir(now_dir) print(f"An error occurred downloading the file: {error}") return None for currentPath, _, zipFiles in os.walk(zips_path): for Files in zipFiles: filePart = Files.split(".") extensionFile = filePart[len(filePart) - 1] filePart.pop() nameFile = "_".join(filePart) realPath = os.path.join(currentPath, Files) os.rename(realPath, nameFile + "." + extensionFile) os.chdir(now_dir) return "downloaded" os.chdir(now_dir) return None