Spaces:
Runtime error
Runtime error
import os | |
import re | |
import sys | |
from urllib.parse import parse_qs, unquote, urlencode, urlparse | |
import requests | |
import six | |
import wget | |
from bs4 import BeautifulSoup | |
now_dir = os.getcwd() | |
sys.path.append(now_dir) | |
from rvc.lib.tools import gdown | |
def find_folder_parent(search_dir, folder_name): | |
for dirpath, dirnames, _ in os.walk(search_dir): | |
if folder_name in dirnames: | |
return os.path.abspath(dirpath) | |
return None | |
file_path = find_folder_parent(now_dir, "logs") | |
zips_path = os.path.join(file_path, "zips") | |
def download_from_url(url): | |
os.makedirs(zips_path, exist_ok=True) | |
if url != "": | |
if "drive.google.com" in url: | |
if "file/d/" in url: | |
file_id = url.split("file/d/")[1].split("/")[0] | |
elif "id=" in url: | |
file_id = url.split("id=")[1].split("&")[0] | |
else: | |
return None | |
if file_id: | |
os.chdir(zips_path) | |
try: | |
gdown.download( | |
f"https://drive.google.com/uc?id={file_id}", | |
quiet=True, | |
fuzzy=True, | |
) | |
except Exception as error: | |
error_message = str(f"An error occurred downloading the file: {error}") | |
if "Too many users have viewed or downloaded this file recently" in error_message: | |
os.chdir(now_dir) | |
return "too much use" | |
elif "Cannot retrieve the public link of the file." in error_message: | |
os.chdir(now_dir) | |
return "private link" | |
else: | |
print(error_message) | |
os.chdir(now_dir) | |
return None | |
elif "disk.yandex.ru" in url: | |
base_url = "https://cloud-api.yandex.net/v1/disk/public/resources/download?" | |
public_key = url | |
final_url = base_url + urlencode(dict(public_key=public_key)) | |
response = requests.get(final_url) | |
download_url = response.json()["href"] | |
download_response = requests.get(download_url) | |
if download_response.status_code == 200: | |
filename = parse_qs(urlparse(unquote(download_url)).query).get("filename", [""])[0] | |
if filename: | |
os.chdir(zips_path) | |
with open(filename, "wb") as f: | |
f.write(download_response.content) | |
else: | |
print("Failed to get filename from URL.") | |
return None | |
elif "pixeldrain.com" in url: | |
try: | |
file_id = url.split("pixeldrain.com/u/")[1] | |
os.chdir(zips_path) | |
print(file_id) | |
response = requests.get(f"https://pixeldrain.com/api/file/{file_id}") | |
if response.status_code == 200: | |
file_name = response.headers.get("Content-Disposition").split("filename=")[-1].strip('";') | |
os.makedirs(zips_path, exist_ok=True) | |
with open(os.path.join(zips_path, file_name), "wb") as newfile: | |
newfile.write(response.content) | |
os.chdir(file_path) | |
return "downloaded" | |
else: | |
os.chdir(file_path) | |
return None | |
except Exception as error: | |
print(f"An error occurred downloading the file: {error}") | |
os.chdir(file_path) | |
return None | |
elif "cdn.discordapp.com" in url: | |
file = requests.get(url) | |
os.chdir(zips_path) | |
if file.status_code == 200: | |
name = url.split("/") | |
with open(os.path.join(name[-1]), "wb") as newfile: | |
newfile.write(file.content) | |
else: | |
return None | |
elif "/blob/" in url or "/resolve/" in url: | |
os.chdir(zips_path) | |
if "/blob/" in url: | |
url = url.replace("/blob/", "/resolve/") | |
response = requests.get(url, stream=True) | |
if response.status_code == 200: | |
content_disposition = six.moves.urllib_parse.unquote(response.headers["Content-Disposition"]) | |
m = re.search(r'filename="([^"]+)"', content_disposition) | |
file_name = m.groups()[0] | |
file_name = file_name.replace(os.path.sep, "_") | |
total_size_in_bytes = int(response.headers.get("content-length", 0)) | |
block_size = 1024 | |
progress_bar_length = 50 | |
progress = 0 | |
with open(os.path.join(zips_path, file_name), "wb") as file: | |
for data in response.iter_content(block_size): | |
file.write(data) | |
progress += len(data) | |
progress_percent = int((progress / total_size_in_bytes) * 100) | |
num_dots = int((progress / total_size_in_bytes) * progress_bar_length) | |
progress_bar = "[" + "." * num_dots + " " * (progress_bar_length - num_dots) + "]" | |
print( | |
f"{progress_percent}% {progress_bar} {progress}/{total_size_in_bytes} ", | |
end="\r", | |
) | |
if progress_percent == 100: | |
print("\n") | |
else: | |
os.chdir(now_dir) | |
return None | |
elif "/tree/main" in url: | |
os.chdir(zips_path) | |
response = requests.get(url) | |
soup = BeautifulSoup(response.content, "html.parser") | |
temp_url = "" | |
for link in soup.find_all("a", href=True): | |
if link["href"].endswith(".zip"): | |
temp_url = link["href"] | |
break | |
if temp_url: | |
url = temp_url | |
url = url.replace("blob", "resolve") | |
if "huggingface.co" not in url: | |
url = "https://huggingface.co" + url | |
wget.download(url) | |
else: | |
os.chdir(now_dir) | |
return None | |
elif "applio.org" in url: | |
parts = url.split("/") | |
id_with_query = parts[-1] | |
id_parts = id_with_query.split("?") | |
id_number = id_parts[0] | |
url = "https://cjtfqzjfdimgpvpwhzlv.supabase.co/rest/v1/models" | |
headers = { | |
"apikey": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImNqdGZxempmZGltZ3B2cHdoemx2Iiwicm9sZSI6ImFub24iLCJpYXQiOjE2OTUxNjczODgsImV4cCI6MjAxMDc0MzM4OH0.7z5WMIbjR99c2Ooc0ma7B_FyGq10G8X-alkCYTkKR10" | |
} | |
params = {"id": f"eq.{id_number}"} | |
response = requests.get(url, headers=headers, params=params) | |
if response.status_code == 200: | |
json_response = response.json() | |
print(json_response) | |
if json_response: | |
link = json_response[0]["link"] | |
verify = download_from_url(link) | |
if verify == "downloaded": | |
return "downloaded" | |
else: | |
return None | |
else: | |
return None | |
else: | |
try: | |
os.chdir(zips_path) | |
wget.download(url) | |
except Exception as error: | |
os.chdir(now_dir) | |
print(f"An error occurred downloading the file: {error}") | |
return None | |
for currentPath, _, zipFiles in os.walk(zips_path): | |
for Files in zipFiles: | |
filePart = Files.split(".") | |
extensionFile = filePart[len(filePart) - 1] | |
filePart.pop() | |
nameFile = "_".join(filePart) | |
realPath = os.path.join(currentPath, Files) | |
os.rename(realPath, nameFile + "." + extensionFile) | |
os.chdir(now_dir) | |
return "downloaded" | |
os.chdir(now_dir) | |
return None | |