tts-service / rvc /lib /tools /model_download.py
Jesus Lopez
feat: applio
a8c39f5
raw
history blame
15.5 kB
import os
import re
import six
import sys
import wget
import shutil
import zipfile
import requests
from bs4 import BeautifulSoup
from urllib.parse import unquote, urlencode, parse_qs, urlparse
now_dir = os.getcwd()
sys.path.append(now_dir)
from rvc.lib.utils import format_title
from rvc.lib.tools import gdown
def find_folder_parent(search_dir, folder_name):
for dirpath, dirnames, _ in os.walk(search_dir):
if folder_name in dirnames:
return os.path.abspath(dirpath)
return None
file_path = find_folder_parent(now_dir, "logs")
zips_path = os.path.join(file_path, "zips")
def search_pth_index(folder):
pth_paths = [
os.path.join(folder, file)
for file in os.listdir(folder)
if os.path.isfile(os.path.join(folder, file)) and file.endswith(".pth")
]
index_paths = [
os.path.join(folder, file)
for file in os.listdir(folder)
if os.path.isfile(os.path.join(folder, file)) and file.endswith(".index")
]
return pth_paths, index_paths
def get_mediafire_download_link(url):
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
download_button = soup.find(
"a", {"class": "input popsok", "aria-label": "Download file"}
)
if download_button:
download_link = download_button.get("href")
return download_link
else:
return None
def download_from_url(url):
os.makedirs(zips_path, exist_ok=True)
if url != "":
if "drive.google.com" in url:
if "file/d/" in url:
file_id = url.split("file/d/")[1].split("/")[0]
elif "id=" in url:
file_id = url.split("id=")[1].split("&")[0]
else:
return None
if file_id:
os.chdir(zips_path)
try:
gdown.download(
f"https://drive.google.com/uc?id={file_id}",
quiet=True,
fuzzy=True,
)
except Exception as error:
error_message = str(
f"An error occurred downloading the file: {error}"
)
if (
"Too many users have viewed or downloaded this file recently"
in error_message
):
os.chdir(now_dir)
return "too much use"
elif (
"Cannot retrieve the public link of the file." in error_message
):
os.chdir(now_dir)
return "private link"
else:
print(error_message)
os.chdir(now_dir)
return None
elif "disk.yandex.ru" in url:
base_url = "https://cloud-api.yandex.net/v1/disk/public/resources/download?"
public_key = url
final_url = base_url + urlencode(dict(public_key=public_key))
response = requests.get(final_url)
download_url = response.json()["href"]
download_response = requests.get(download_url)
if download_response.status_code == 200:
filename = parse_qs(urlparse(unquote(download_url)).query).get(
"filename", [""]
)[0]
if filename:
os.chdir(zips_path)
with open(filename, "wb") as f:
f.write(download_response.content)
else:
print("Failed to get filename from URL.")
return None
elif "pixeldrain.com" in url:
try:
file_id = url.split("pixeldrain.com/u/")[1]
os.chdir(zips_path)
print(file_id)
response = requests.get(f"https://pixeldrain.com/api/file/{file_id}")
if response.status_code == 200:
file_name = (
response.headers.get("Content-Disposition")
.split("filename=")[-1]
.strip('";')
)
os.makedirs(zips_path, exist_ok=True)
with open(os.path.join(zips_path, file_name), "wb") as newfile:
newfile.write(response.content)
os.chdir(file_path)
return "downloaded"
else:
os.chdir(file_path)
return None
except Exception as error:
print(f"An error occurred downloading the file: {error}")
os.chdir(file_path)
return None
elif "cdn.discordapp.com" in url:
file = requests.get(url)
os.chdir(zips_path)
if file.status_code == 200:
name = url.split("/")
with open(os.path.join(name[-1]), "wb") as newfile:
newfile.write(file.content)
else:
return None
elif "/blob/" in url or "/resolve/" in url:
os.chdir(zips_path)
if "/blob/" in url:
url = url.replace("/blob/", "/resolve/")
response = requests.get(url, stream=True)
if response.status_code == 200:
content_disposition = six.moves.urllib_parse.unquote(
response.headers["Content-Disposition"]
)
m = re.search(r'filename="([^"]+)"', content_disposition)
file_name = m.groups()[0]
file_name = file_name.replace(os.path.sep, "_")
total_size_in_bytes = int(response.headers.get("content-length", 0))
block_size = 1024
progress_bar_length = 50
progress = 0
with open(os.path.join(zips_path, file_name), "wb") as file:
for data in response.iter_content(block_size):
file.write(data)
progress += len(data)
progress_percent = int((progress / total_size_in_bytes) * 100)
num_dots = int(
(progress / total_size_in_bytes) * progress_bar_length
)
progress_bar = (
"["
+ "." * num_dots
+ " " * (progress_bar_length - num_dots)
+ "]"
)
print(
f"{progress_percent}% {progress_bar} {progress}/{total_size_in_bytes} ",
end="\r",
)
if progress_percent == 100:
print("\n")
else:
os.chdir(now_dir)
return None
elif "/tree/main" in url:
os.chdir(zips_path)
response = requests.get(url)
soup = BeautifulSoup(response.content, "html.parser")
temp_url = ""
for link in soup.find_all("a", href=True):
if link["href"].endswith(".zip"):
temp_url = link["href"]
break
if temp_url:
url = temp_url
url = url.replace("blob", "resolve")
if "huggingface.co" not in url:
url = "https://huggingface.co" + url
wget.download(url)
else:
os.chdir(now_dir)
return None
elif "applio.org" in url:
parts = url.split("/")
id_with_query = parts[-1]
id_parts = id_with_query.split("?")
id_number = id_parts[0]
url = "https://cjtfqzjfdimgpvpwhzlv.supabase.co/rest/v1/models"
headers = {
"apikey": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImNqdGZxempmZGltZ3B2cHdoemx2Iiwicm9sZSI6ImFub24iLCJpYXQiOjE2OTUxNjczODgsImV4cCI6MjAxMDc0MzM4OH0.7z5WMIbjR99c2Ooc0ma7B_FyGq10G8X-alkCYTkKR10"
}
params = {"id": f"eq.{id_number}"}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
json_response = response.json()
print(json_response)
if json_response:
link = json_response[0]["link"]
verify = download_from_url(link)
if verify == "downloaded":
return "downloaded"
else:
return None
else:
return None
else:
try:
os.chdir(zips_path)
wget.download(url)
except Exception as error:
os.chdir(now_dir)
print(f"An error occurred downloading the file: {error}")
return None
for currentPath, _, zipFiles in os.walk(zips_path):
for Files in zipFiles:
filePart = Files.split(".")
extensionFile = filePart[len(filePart) - 1]
filePart.pop()
nameFile = "_".join(filePart)
realPath = os.path.join(currentPath, Files)
os.rename(realPath, nameFile + "." + extensionFile)
os.chdir(now_dir)
return "downloaded"
os.chdir(now_dir)
return None
def extract_and_show_progress(zipfile_path, unzips_path):
try:
with zipfile.ZipFile(zipfile_path, "r") as zip_ref:
for file_info in zip_ref.infolist():
zip_ref.extract(file_info, unzips_path)
os.remove(zipfile_path)
return True
except Exception as error:
print(f"An error occurred extracting the zip file: {error}")
return False
def unzip_file(zip_path, zip_file_name):
zip_file_path = os.path.join(zip_path, zip_file_name + ".zip")
extract_path = os.path.join(file_path, zip_file_name)
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(extract_path)
os.remove(zip_file_path)
def model_download_pipeline(url: str):
try:
verify = download_from_url(url)
if verify == "downloaded":
extract_folder_path = ""
for filename in os.listdir(zips_path):
if filename.endswith(".zip"):
zipfile_path = os.path.join(zips_path, filename)
print("Proceeding with the extraction...")
model_zip = os.path.basename(zipfile_path)
model_name = format_title(model_zip.split(".zip")[0])
extract_folder_path = os.path.join(
"logs",
os.path.normpath(model_name),
)
success = extract_and_show_progress(
zipfile_path, extract_folder_path
)
macosx_path = os.path.join(extract_folder_path, "__MACOSX")
if os.path.exists(macosx_path):
shutil.rmtree(macosx_path)
subfolders = [
f
for f in os.listdir(extract_folder_path)
if os.path.isdir(os.path.join(extract_folder_path, f))
]
if len(subfolders) == 1:
subfolder_path = os.path.join(
extract_folder_path, subfolders[0]
)
for item in os.listdir(subfolder_path):
s = os.path.join(subfolder_path, item)
d = os.path.join(extract_folder_path, item)
shutil.move(s, d)
os.rmdir(subfolder_path)
for item in os.listdir(extract_folder_path):
if ".pth" in item:
file_name = item.split(".pth")[0]
if file_name != model_name:
os.rename(
os.path.join(extract_folder_path, item),
os.path.join(
extract_folder_path, model_name + ".pth"
),
)
else:
if "v2" not in item:
if "_nprobe_1_" in item and "_v1" in item:
file_name = item.split("_nprobe_1_")[1].split(
"_v1"
)[0]
if file_name != model_name:
new_file_name = (
item.split("_nprobe_1_")[0]
+ "_nprobe_1_"
+ model_name
+ "_v1"
)
os.rename(
os.path.join(extract_folder_path, item),
os.path.join(
extract_folder_path,
new_file_name + ".index",
),
)
else:
if "_nprobe_1_" in item and "_v2" in item:
file_name = item.split("_nprobe_1_")[1].split(
"_v2"
)[0]
if file_name != model_name:
new_file_name = (
item.split("_nprobe_1_")[0]
+ "_nprobe_1_"
+ model_name
+ "_v2"
)
os.rename(
os.path.join(extract_folder_path, item),
os.path.join(
extract_folder_path,
new_file_name + ".index",
),
)
if success:
print(f"Model {model_name} downloaded!")
else:
print(f"Error downloading {model_name}")
return "Error"
if extract_folder_path == "":
print("Zip file was not found.")
return "Error"
result = search_pth_index(extract_folder_path)
return result
else:
return "Error"
except Exception as error:
print(f"An unexpected error occurred: {error}")
return "Error"