Applio / rvc /lib /tools /model_download.py
Aitron Emper
Update rvc/lib/tools/model_download.py
5024722 verified
raw
history blame
13.9 kB
import os
import sys
import wget
import zipfile
from bs4 import BeautifulSoup
import requests
from urllib.parse import unquote, urlencode, parse_qs, urlparse
import re
import shutil
import six
def find_folder_parent(search_dir, folder_name):
for dirpath, dirnames, _ in os.walk(search_dir):
if folder_name in dirnames:
return os.path.abspath(dirpath)
return None
now_dir = os.getcwd()
sys.path.append(now_dir)
from rvc.lib.utils import format_title
from rvc.lib.tools import gdown
file_path = find_folder_parent(now_dir, "logs")
zips_path = os.getcwd() + "/logs/zips"
def search_pth_index(folder):
pth_paths = [
os.path.join(folder, file)
for file in os.listdir(folder)
if os.path.isfile(os.path.join(folder, file)) and file.endswith(".pth")
]
index_paths = [
os.path.join(folder, file)
for file in os.listdir(folder)
if os.path.isfile(os.path.join(folder, file)) and file.endswith(".index")
]
return pth_paths, index_paths
def get_mediafire_download_link(url):
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
download_button = soup.find(
"a", {"class": "input popsok", "aria-label": "Download file"}
)
if download_button:
download_link = download_button.get("href")
return download_link
else:
return None
def download_from_url(url):
os.makedirs(zips_path, exist_ok=True)
if url != "":
if "drive.google.com" in url:
if "file/d/" in url:
file_id = url.split("file/d/")[1].split("/")[0]
elif "id=" in url:
file_id = url.split("id=")[1].split("&")[0]
else:
return None
if file_id:
os.chdir(zips_path)
try:
gdown.download(
f"https://drive.google.com/uc?id={file_id}",
quiet=True,
fuzzy=True,
)
except Exception as error:
error_message = str(error)
if (
"Too many users have viewed or downloaded this file recently"
in error_message
):
os.chdir(now_dir)
return "too much use"
elif (
"Cannot retrieve the public link of the file." in error_message
):
os.chdir(now_dir)
return "private link"
else:
print(error_message)
os.chdir(now_dir)
return None
elif "disk.yandex.ru" in url:
base_url = "https://cloud-api.yandex.net/v1/disk/public/resources/download?"
public_key = url
final_url = base_url + urlencode(dict(public_key=public_key))
response = requests.get(final_url)
download_url = response.json()["href"]
download_response = requests.get(download_url)
if download_response.status_code == 200:
filename = parse_qs(urlparse(unquote(download_url)).query).get(
"filename", [""]
)[0]
if filename:
os.chdir(zips_path)
with open(filename, "wb") as f:
f.write(download_response.content)
else:
print("Failed to get filename from URL.")
return None
elif "pixeldrain.com" in url:
try:
file_id = url.split("pixeldrain.com/u/")[1]
os.chdir(zips_path)
print(file_id)
response = requests.get(f"https://pixeldrain.com/api/file/{file_id}")
if response.status_code == 200:
file_name = (
response.headers.get("Content-Disposition")
.split("filename=")[-1]
.strip('";')
)
os.makedirs(zips_path, exist_ok=True)
with open(os.path.join(zips_path, file_name), "wb") as newfile:
newfile.write(response.content)
os.chdir(file_path)
return "downloaded"
else:
os.chdir(file_path)
return None
except Exception as e:
print(e)
os.chdir(file_path)
return None
elif "cdn.discordapp.com" in url:
file = requests.get(url)
os.chdir(zips_path)
if file.status_code == 200:
name = url.split("/")
with open(os.path.join(name[-1]), "wb") as newfile:
newfile.write(file.content)
else:
return None
elif "/blob/" in url or "/resolve/" in url:
os.chdir(zips_path)
if "/blob/" in url:
url = url.replace("/blob/", "/resolve/")
response = requests.get(url, stream=True)
if response.status_code == 200:
content_disposition = six.moves.urllib_parse.unquote(
response.headers["Content-Disposition"]
)
m = re.search(r'filename="([^"]+)"', content_disposition)
file_name = m.groups()[0]
file_name = file_name.replace(os.path.sep, "_")
total_size_in_bytes = int(response.headers.get("content-length", 0))
block_size = 1024
progress_bar_length = 50
progress = 0
with open(os.path.join(zips_path, file_name), "wb") as file:
for data in response.iter_content(block_size):
file.write(data)
progress += len(data)
progress_percent = int((progress / total_size_in_bytes) * 100)
num_dots = int(
(progress / total_size_in_bytes) * progress_bar_length
)
progress_bar = (
"["
+ "." * num_dots
+ " " * (progress_bar_length - num_dots)
+ "]"
)
print(
f"{progress_percent}% {progress_bar} {progress}/{total_size_in_bytes} ",
end="\r",
)
if progress_percent == 100:
print("\n")
else:
os.chdir(now_dir)
return None
elif "/tree/main" in url:
os.chdir(zips_path)
response = requests.get(url)
soup = BeautifulSoup(response.content, "html.parser")
temp_url = ""
for link in soup.find_all("a", href=True):
if link["href"].endswith(".zip"):
temp_url = link["href"]
break
if temp_url:
url = temp_url
url = url.replace("blob", "resolve")
if "huggingface.co" not in url:
url = "https://huggingface.co" + url
wget.download(url)
else:
os.chdir(now_dir)
return None
elif "applio.org" in url:
parts = url.split("/")
id_with_query = parts[-1]
id_parts = id_with_query.split("?")
id_number = id_parts[0]
url = "https://cjtfqzjfdimgpvpwhzlv.supabase.co/rest/v1/models"
headers = {
"apikey": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImNqdGZxempmZGltZ3B2cHdoemx2Iiwicm9sZSI6ImFub24iLCJpYXQiOjE2OTUxNjczODgsImV4cCI6MjAxMDc0MzM4OH0.7z5WMIbjR99c2Ooc0ma7B_FyGq10G8X-alkCYTkKR10"
}
params = {"id": f"eq.{id_number}"}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
json_response = response.json()
print(json_response)
if json_response:
link = json_response[0]["link"]
verify = download_from_url(link)
if verify == "downloaded":
return "downloaded"
else:
return None
else:
return None
else:
try:
os.chdir(zips_path)
wget.download(url)
except Exception as error:
os.chdir(now_dir)
print(error)
return None
for currentPath, _, zipFiles in os.walk(zips_path):
for Files in zipFiles:
filePart = Files.split(".")
extensionFile = filePart[len(filePart) - 1]
filePart.pop()
nameFile = "_".join(filePart)
realPath = os.path.join(currentPath, Files)
os.rename(realPath, nameFile + "." + extensionFile)
os.chdir(now_dir)
return "downloaded"
os.chdir(now_dir)
return None
def extract_and_show_progress(zipfile_path, unzips_path):
try:
with zipfile.ZipFile(zipfile_path, "r") as zip_ref:
for file_info in zip_ref.infolist():
zip_ref.extract(file_info, unzips_path)
os.remove(zipfile_path)
return True
except Exception as error:
print(error)
return False
def unzip_file(zip_path, zip_file_name):
zip_file_path = os.path.join(zip_path, zip_file_name + ".zip")
extract_path = os.path.join(file_path, zip_file_name)
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(extract_path)
os.remove(zip_file_path)
def model_download_pipeline(url):
verify = download_from_url(url)
if verify == "downloaded":
extract_folder_path = ""
for filename in os.listdir(zips_path):
if filename.endswith(".zip"):
zipfile_path = os.path.join(zips_path, filename)
print("Proceeding with the extraction...")
model_zip = os.path.basename(zipfile_path)
model_name = format_title(model_zip.split(".zip")[0])
extract_folder_path = os.path.join(
"logs",
os.path.normpath(model_name),
)
success = extract_and_show_progress(zipfile_path, extract_folder_path)
subfolders = [
f
for f in os.listdir(extract_folder_path)
if os.path.isdir(os.path.join(extract_folder_path, f))
]
if len(subfolders) == 1:
subfolder_path = os.path.join(extract_folder_path, subfolders[0])
for item in os.listdir(subfolder_path):
s = os.path.join(subfolder_path, item)
d = os.path.join(extract_folder_path, item)
shutil.move(s, d)
os.rmdir(subfolder_path)
for item in os.listdir(extract_folder_path):
if ".pth" in item:
file_name = item.split(".pth")[0]
if file_name != model_name:
os.rename(
os.path.join(extract_folder_path, item),
os.path.join(extract_folder_path, model_name + ".pth"),
)
else:
if "v2" not in item:
file_name = item.split("_nprobe_1_")[1].split("_v1")[0]
if file_name != model_name:
new_file_name = (
item.split("_nprobe_1_")[0]
+ "_nprobe_1_"
+ model_name
+ "_v1"
)
os.rename(
os.path.join(extract_folder_path, item),
os.path.join(
extract_folder_path, new_file_name + ".index"
),
)
else:
file_name = item.split("_nprobe_1_")[1].split("_v2")[0]
if file_name != model_name:
new_file_name = (
item.split("_nprobe_1_")[0]
+ "_nprobe_1_"
+ model_name
+ "_v2"
)
os.rename(
os.path.join(extract_folder_path, item),
os.path.join(
extract_folder_path, new_file_name + ".index"
),
)
if success:
print(f"Model {model_name} downloaded!")
else:
print(f"Error downloading {model_name}")
if extract_folder_path == "":
print("Zip file was not found.")
result = search_pth_index(extract_folder_path)
else:
message = "Error"