tts-service / rvc /lib /tools /model_download.py
jlopez00's picture
Upload folder using huggingface_hub
bbf5262 verified
raw
history blame
8.26 kB
import os
import re
import sys
from urllib.parse import parse_qs, unquote, urlencode, urlparse
import requests
import six
import wget
from bs4 import BeautifulSoup
now_dir = os.getcwd()
sys.path.append(now_dir)
from rvc.lib.tools import gdown
def find_folder_parent(search_dir, folder_name):
for dirpath, dirnames, _ in os.walk(search_dir):
if folder_name in dirnames:
return os.path.abspath(dirpath)
return None
file_path = find_folder_parent(now_dir, "logs")
zips_path = os.path.join(file_path, "zips")
def download_from_url(url):
os.makedirs(zips_path, exist_ok=True)
if url != "":
if "drive.google.com" in url:
if "file/d/" in url:
file_id = url.split("file/d/")[1].split("/")[0]
elif "id=" in url:
file_id = url.split("id=")[1].split("&")[0]
else:
return None
if file_id:
os.chdir(zips_path)
try:
gdown.download(
f"https://drive.google.com/uc?id={file_id}",
quiet=True,
fuzzy=True,
)
except Exception as error:
error_message = str(f"An error occurred downloading the file: {error}")
if "Too many users have viewed or downloaded this file recently" in error_message:
os.chdir(now_dir)
return "too much use"
elif "Cannot retrieve the public link of the file." in error_message:
os.chdir(now_dir)
return "private link"
else:
print(error_message)
os.chdir(now_dir)
return None
elif "disk.yandex.ru" in url:
base_url = "https://cloud-api.yandex.net/v1/disk/public/resources/download?"
public_key = url
final_url = base_url + urlencode(dict(public_key=public_key))
response = requests.get(final_url)
download_url = response.json()["href"]
download_response = requests.get(download_url)
if download_response.status_code == 200:
filename = parse_qs(urlparse(unquote(download_url)).query).get("filename", [""])[0]
if filename:
os.chdir(zips_path)
with open(filename, "wb") as f:
f.write(download_response.content)
else:
print("Failed to get filename from URL.")
return None
elif "pixeldrain.com" in url:
try:
file_id = url.split("pixeldrain.com/u/")[1]
os.chdir(zips_path)
print(file_id)
response = requests.get(f"https://pixeldrain.com/api/file/{file_id}")
if response.status_code == 200:
file_name = response.headers.get("Content-Disposition").split("filename=")[-1].strip('";')
os.makedirs(zips_path, exist_ok=True)
with open(os.path.join(zips_path, file_name), "wb") as newfile:
newfile.write(response.content)
os.chdir(file_path)
return "downloaded"
else:
os.chdir(file_path)
return None
except Exception as error:
print(f"An error occurred downloading the file: {error}")
os.chdir(file_path)
return None
elif "cdn.discordapp.com" in url:
file = requests.get(url)
os.chdir(zips_path)
if file.status_code == 200:
name = url.split("/")
with open(os.path.join(name[-1]), "wb") as newfile:
newfile.write(file.content)
else:
return None
elif "/blob/" in url or "/resolve/" in url:
os.chdir(zips_path)
if "/blob/" in url:
url = url.replace("/blob/", "/resolve/")
response = requests.get(url, stream=True)
if response.status_code == 200:
content_disposition = six.moves.urllib_parse.unquote(response.headers["Content-Disposition"])
m = re.search(r'filename="([^"]+)"', content_disposition)
file_name = m.groups()[0]
file_name = file_name.replace(os.path.sep, "_")
total_size_in_bytes = int(response.headers.get("content-length", 0))
block_size = 1024
progress_bar_length = 50
progress = 0
with open(os.path.join(zips_path, file_name), "wb") as file:
for data in response.iter_content(block_size):
file.write(data)
progress += len(data)
progress_percent = int((progress / total_size_in_bytes) * 100)
num_dots = int((progress / total_size_in_bytes) * progress_bar_length)
progress_bar = "[" + "." * num_dots + " " * (progress_bar_length - num_dots) + "]"
print(
f"{progress_percent}% {progress_bar} {progress}/{total_size_in_bytes} ",
end="\r",
)
if progress_percent == 100:
print("\n")
else:
os.chdir(now_dir)
return None
elif "/tree/main" in url:
os.chdir(zips_path)
response = requests.get(url)
soup = BeautifulSoup(response.content, "html.parser")
temp_url = ""
for link in soup.find_all("a", href=True):
if link["href"].endswith(".zip"):
temp_url = link["href"]
break
if temp_url:
url = temp_url
url = url.replace("blob", "resolve")
if "huggingface.co" not in url:
url = "https://huggingface.co" + url
wget.download(url)
else:
os.chdir(now_dir)
return None
elif "applio.org" in url:
parts = url.split("/")
id_with_query = parts[-1]
id_parts = id_with_query.split("?")
id_number = id_parts[0]
url = "https://cjtfqzjfdimgpvpwhzlv.supabase.co/rest/v1/models"
headers = {
"apikey": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImNqdGZxempmZGltZ3B2cHdoemx2Iiwicm9sZSI6ImFub24iLCJpYXQiOjE2OTUxNjczODgsImV4cCI6MjAxMDc0MzM4OH0.7z5WMIbjR99c2Ooc0ma7B_FyGq10G8X-alkCYTkKR10"
}
params = {"id": f"eq.{id_number}"}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
json_response = response.json()
print(json_response)
if json_response:
link = json_response[0]["link"]
verify = download_from_url(link)
if verify == "downloaded":
return "downloaded"
else:
return None
else:
return None
else:
try:
os.chdir(zips_path)
wget.download(url)
except Exception as error:
os.chdir(now_dir)
print(f"An error occurred downloading the file: {error}")
return None
for currentPath, _, zipFiles in os.walk(zips_path):
for Files in zipFiles:
filePart = Files.split(".")
extensionFile = filePart[len(filePart) - 1]
filePart.pop()
nameFile = "_".join(filePart)
realPath = os.path.join(currentPath, Files)
os.rename(realPath, nameFile + "." + extensionFile)
os.chdir(now_dir)
return "downloaded"
os.chdir(now_dir)
return None