import spaces import os import glob import json import traceback import logging import gradio as gr import numpy as np import librosa import torch import asyncio import ffmpeg import subprocess import sys import io import wave from datetime import datetime import urllib.request import zipfile import shutil import gradio as gr from textwrap import dedent import pprint import time import re import requests import subprocess from pathlib import Path from scipy.io.wavfile import write from scipy.io import wavfile import soundfile as sf from lib.infer_pack.models import ( SynthesizerTrnMs256NSFsid, SynthesizerTrnMs256NSFsid_nono, SynthesizerTrnMs768NSFsid, SynthesizerTrnMs768NSFsid_nono, ) from vc_infer_pipeline import VC from config import Config config = Config() logging.getLogger("numba").setLevel(logging.WARNING) spaces_hf = True #os.getenv("SYSTEM") == "spaces" force_support = True audio_mode = [] f0method_mode = [] f0method_info = "" headers = { "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36" } pattern = r'//www\.bilibili\.com/video[^"]*' # Download models #urllib.request.urlretrieve("https://download.openxlab.org.cn/models/Kevin676/rvc-models/weight/hubert_base", "hubert_base.pt") #urllib.request.urlretrieve("https://download.openxlab.org.cn/models/Kevin676/rvc-models/weight/rmvpe", "rmvpe.pt") # Get zip name pattern_zip = r"/([^/]+)\.zip$" def get_file_name(url): match = re.search(pattern_zip, url) if match: extracted_string = match.group(1) return extracted_string else: raise Exception("没有找到AI歌手模型的zip压缩包。") # Get RVC models def extract_zip(extraction_folder, zip_name): os.makedirs(extraction_folder) with zipfile.ZipFile(zip_name, 'r') as zip_ref: zip_ref.extractall(extraction_folder) os.remove(zip_name) index_filepath, model_filepath = None, None for root, dirs, files in os.walk(extraction_folder): for name in files: if name.endswith('.index') and os.stat(os.path.join(root, name)).st_size > 1024 * 100: index_filepath = os.path.join(root, name) if name.endswith('.pth') and os.stat(os.path.join(root, name)).st_size > 1024 * 1024 * 40: model_filepath = os.path.join(root, name) if not model_filepath: raise Exception(f'No .pth model file was found in the extracted zip. Please check {extraction_folder}.') # move model and index file to extraction folder os.rename(model_filepath, os.path.join(extraction_folder, os.path.basename(model_filepath))) if index_filepath: os.rename(index_filepath, os.path.join(extraction_folder, os.path.basename(index_filepath))) # remove any unnecessary nested folders for filepath in os.listdir(extraction_folder): if os.path.isdir(os.path.join(extraction_folder, filepath)): shutil.rmtree(os.path.join(extraction_folder, filepath)) # Get username in OpenXLab def get_username(url): match_username = re.search(r'models/(.*?)/', url) if match_username: result = match_username.group(1) return result # Get username in Hugging Face def get_username_hf(url): match_username = re.search(r'huggingface.co/(.*?)/', url) if match_username: result = match_username.group(1) return result def download_online_model(url, dir_name): if url.startswith('https://download.openxlab.org.cn/models/'): zip_path = get_username(url) + "-" + get_file_name(url) elif url.startswith('https://huggingface.co/'): zip_path = get_username_hf(url) + "-" + get_file_name(url) else: zip_path = get_file_name(url) if not os.path.exists(zip_path): print("P.S. AI歌手模型还未下载") try: zip_name = url.split('/')[-1] extraction_folder = os.path.join(zip_path, dir_name) if os.path.exists(extraction_folder): raise Exception(f'Voice model directory {dir_name} already exists! Choose a different name for your voice model.') if 'pixeldrain.com' in url: url = f'https://pixeldrain.com/api/file/{zip_name}' urllib.request.urlretrieve(url, zip_name) extract_zip(extraction_folder, zip_name) #return f'[√] {dir_name} Model successfully downloaded!' except Exception as e: raise Exception(str(e)) else: print("P.S. AI歌手模型之前已经下载") #Get bilibili BV id def get_bilibili_video_id(url): match = re.search(r'/video/([a-zA-Z0-9]+)/', url) extracted_value = match.group(1) return extracted_value # Get bilibili audio def find_first_appearance_with_neighborhood(text, pattern): match = re.search(pattern, text) if match: return match.group() else: return None def search_bilibili(keyword): if keyword.startswith("BV"): req = requests.get("https://search.bilibili.com/all?keyword={}&duration=1".format(keyword), headers=headers).text else: req = requests.get("https://search.bilibili.com/all?keyword={}&duration=1&tids=3&page=1".format(keyword), headers=headers).text video_link = "https:" + find_first_appearance_with_neighborhood(req, pattern) return video_link # Save bilibili audio def get_response(html_url): headers = { "referer": "https://www.bilibili.com/", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36" } response = requests.get(html_url, headers=headers) return response def get_video_info(html_url): response = get_response(html_url) html_data = re.findall('', response.text)[0] json_data = json.loads(html_data) if json_data['data']['dash']['audio'][0]['backupUrl']!=None: audio_url = json_data['data']['dash']['audio'][0]['backupUrl'][0] else: audio_url = json_data['data']['dash']['audio'][0]['baseUrl'] return audio_url def save_audio(title, audio_url): audio_content = get_response(audio_url).content with open(title + '.wav', mode='wb') as f: f.write(audio_content) print("音乐内容保存完成") # Use UVR-HP5/2 urllib.request.urlretrieve("https://download.openxlab.org.cn/models/Kevin676/rvc-models/weight/UVR-HP2.pth", "uvr5/uvr_model/UVR-HP2.pth") urllib.request.urlretrieve("https://download.openxlab.org.cn/models/Kevin676/rvc-models/weight/UVR-HP5.pth", "uvr5/uvr_model/UVR-HP5.pth") #urllib.request.urlretrieve("https://huggingface.co/fastrolling/uvr/resolve/main/Main_Models/5_HP-Karaoke-UVR.pth", "uvr5/uvr_model/UVR-HP5.pth") from uvr5.vr import AudioPre weight_uvr5_root = "uvr5/uvr_model" uvr5_names = [] for name in os.listdir(weight_uvr5_root): if name.endswith(".pth") or "onnx" in name: uvr5_names.append(name.replace(".pth", "")) func = AudioPre pre_fun_hp2 = func( agg=int(10), model_path=os.path.join(weight_uvr5_root, "UVR-HP2.pth"), device="cuda", is_half=True, ) pre_fun_hp5 = func( agg=int(10), model_path=os.path.join(weight_uvr5_root, "UVR-HP5.pth"), device="cuda", is_half=True, ) # Separate vocals def youtube_downloader( filename, split_model, ): audio_path = filename.strip() + ".wav" # make dir output os.makedirs("output", exist_ok=True) if split_model=="UVR-HP2": pre_fun = pre_fun_hp2 else: pre_fun = pre_fun_hp5 pre_fun._path_audio_(audio_path, f"./output/{split_model}/{filename}/", f"./output/{split_model}/{filename}/", "wav") os.remove(filename.strip()+".wav") return f"./output/{split_model}/{filename}/vocal_{filename}.wav_10.wav", f"./output/{split_model}/{filename}/instrument_{filename}.wav_10.wav" # get duration import wave def get_duration_wave(file_path): with wave.open(file_path, 'r') as audio_file: frame_rate = audio_file.getframerate() n_frames = audio_file.getnframes() duration = n_frames / float(frame_rate) return duration # Original code if force_support is False or spaces_hf is True: if spaces_hf is True: audio_mode = ["Upload audio", "TTS Audio"] else: audio_mode = ["Input path", "Upload audio", "TTS Audio"] f0method_mode = ["pm", "harvest"] f0method_info = "PM is fast, Harvest is good but extremely slow, Rvmpe is alternative to harvest (might be better). (Default: PM)" else: audio_mode = ["Input path", "Upload audio", "Youtube", "TTS Audio"] f0method_mode = ["pm", "harvest", "crepe"] f0method_info = "PM is fast, Harvest is good but extremely slow, Rvmpe is alternative to harvest (might be better), and Crepe effect is good but requires GPU (Default: PM)" if os.path.isfile("rmvpe.pt"): f0method_mode.insert(2, "rmvpe") def create_vc_fn(model_name, tgt_sr, net_g, vc, if_f0, version, file_index): def vc_fn( vc_audio_mode, vc_input, vc_upload, tts_text, tts_voice, f0_up_key, f0_method, index_rate, filter_radius, resample_sr, rms_mix_rate, protect, ): try: logs = [] print(f"Converting using {model_name}...") logs.append(f"Converting using {model_name}...") yield "\n".join(logs), None if vc_audio_mode == "Input path" or "Youtube" and vc_input != "": audio, sr = librosa.load(vc_input, sr=16000, mono=True) elif vc_audio_mode == "Upload audio": if vc_upload is None: return "You need to upload an audio", None sampling_rate, audio = vc_upload duration = audio.shape[0] / sampling_rate if duration > 20 and spaces_hf: return "Please upload an audio file that is less than 20 seconds. If you need to generate a longer audio file, please use Colab.", None audio = (audio / np.iinfo(audio.dtype).max).astype(np.float32) if len(audio.shape) > 1: audio = librosa.to_mono(audio.transpose(1, 0)) if sampling_rate != 16000: audio = librosa.resample(audio, orig_sr=sampling_rate, target_sr=16000) times = [0, 0, 0] f0_up_key = int(f0_up_key) audio_opt = vc.pipeline( hubert_model, net_g, 0, audio, vc_input, times, f0_up_key, f0_method, file_index, # file_big_npy, index_rate, if_f0, filter_radius, tgt_sr, resample_sr, rms_mix_rate, version, protect, f0_file=None, ) info = f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}]: npy: {times[0]}, f0: {times[1]}s, infer: {times[2]}s" print(f"{model_name} | {info}") logs.append(f"Successfully Convert {model_name}\n{info}") yield "\n".join(logs), (tgt_sr, audio_opt) except Exception as err: info = traceback.format_exc() print(info) print(f"Error when using {model_name}.\n{str(err)}") yield info, None return vc_fn def combine_vocal_and_inst(model_name, song_name, song_id, split_model, cover_song, vocal_volume, inst_volume): #samplerate, data = wavfile.read(cover_song) vocal_path = cover_song #f"output/{split_model}/{song_id}/vocal_{song_id}.wav_10.wav" output_path = song_name.strip() + "-AI-" + ''.join(os.listdir(f"{model_name}")).strip() + "翻唱版.mp3" inst_path = f"output/{split_model}/{song_id}/instrument_{song_id}.wav_10.wav" #with wave.open(vocal_path, "w") as wave_file: #wave_file.setnchannels(1) #wave_file.setsampwidth(2) #wave_file.setframerate(samplerate) #wave_file.writeframes(data.tobytes()) command = f'ffmpeg -y -i {inst_path} -i {vocal_path} -filter_complex [0:a]volume={inst_volume}[i];[1:a]volume={vocal_volume}[v];[i][v]amix=inputs=2:duration=longest[a] -map [a] -b:a 320k -c:a libmp3lame {output_path}' result = subprocess.run(command.split(), stdout=subprocess.PIPE) print(result.stdout.decode()) return output_path def rvc_models(model_name): global vc, net_g, index_files, tgt_sr, version categories = [] models = [] for w_root, w_dirs, _ in os.walk(f"{model_name}"): model_count = 1 for sub_dir in w_dirs: pth_files = glob.glob(f"{model_name}/{sub_dir}/*.pth") index_files = glob.glob(f"{model_name}/{sub_dir}/*.index") if pth_files == []: print(f"Model [{model_count}/{len(w_dirs)}]: No Model file detected, skipping...") continue cpt = torch.load(pth_files[0]) tgt_sr = cpt["config"][-1] cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] # n_spk if_f0 = cpt.get("f0", 1) version = cpt.get("version", "v1") if version == "v1": if if_f0 == 1: net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half) else: net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) model_version = "V1" elif version == "v2": if if_f0 == 1: net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half) else: net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) model_version = "V2" del net_g.enc_q print(net_g.load_state_dict(cpt["weight"], strict=False)) net_g.eval().to(config.device) if config.is_half: net_g = net_g.half() else: net_g = net_g.float() vc = VC(tgt_sr, config) if index_files == []: print("Warning: No Index file detected!") index_info = "None" model_index = "" else: index_info = index_files[0] model_index = index_files[0] print(f"Model loaded [{model_count}/{len(w_dirs)}]: {index_files[0]} / {index_info} | ({model_version})") model_count += 1 models.append((index_files[0][:-4], index_files[0][:-4], "", "", model_version, create_vc_fn(index_files[0], tgt_sr, net_g, vc, if_f0, version, model_index))) categories.append(["Models", "", models]) return vc, net_g, index_files, tgt_sr, version singers="您的专属AI歌手阵容:" @spaces.GPU(duration=120) def rvc_infer_music_gpu(zip_path, song_name, song_id, split_model, f0_up_key, vocal_volume, inst_volume): print("3.1.开始加载HuBert模型...") from fairseq import checkpoint_utils models, _, _ = checkpoint_utils.load_model_ensemble_and_task( ["hubert_base.pt"], suffix="", ) hubert_model = models[0] hubert_model = hubert_model.to(config.device) if config.is_half: hubert_model = hubert_model.half() else: hubert_model = hubert_model.float() hubert_model.eval() print("3.2.开始加载AI歌手模型参数...") rvc_models(zip_path) if os.path.isdir(f"./output/{split_model}/{song_id}")==True: print("4.直接开始推理(BGM之前已经去除)...") audio, sr = librosa.load(f"./output/{split_model}/{song_id}/vocal_{song_id}.wav_10.wav", sr=16000, mono=True) song_infer = vc.pipeline( hubert_model, net_g, 0, audio, "", [0, 0, 0], f0_up_key, "rmvpe", index_files[0], 0.7, 1, 3, tgt_sr, 0, 0.25, version, 0.33, f0_file=None, ) else: print("4.1.开始去除BGM...") audio, sr = librosa.load(youtube_downloader(song_id, split_model)[0], sr=16000, mono=True) print("4.2.开始推理...") song_infer = vc.pipeline( hubert_model, net_g, 0, audio, "", [0, 0, 0], f0_up_key, "rmvpe", index_files[0], 0.7, 1, 3, tgt_sr, 0, 0.25, version, 0.33, f0_file=None, ) sf.write(song_name.strip()+zip_path+"AI翻唱.wav", song_infer, tgt_sr) output_full_song = combine_vocal_and_inst(zip_path, song_name.strip(), song_id, split_model, song_name.strip()+zip_path+"AI翻唱.wav", vocal_volume, inst_volume) os.remove(song_name.strip()+zip_path+"AI翻唱.wav") return output_full_song @spaces.GPU(duration=30) def rvc_infer_upload_audio_gpu(zip_path, upload_audio, split_model, f0_up_key, vocal_volume, inst_volume): print("3.1.开始加载HuBert模型...") from fairseq import checkpoint_utils models, _, _ = checkpoint_utils.load_model_ensemble_and_task( ["hubert_base.pt"], suffix="", ) hubert_model = models[0] hubert_model = hubert_model.to(config.device) if config.is_half: hubert_model = hubert_model.half() else: hubert_model = hubert_model.float() hubert_model.eval() print("3.2.开始加载AI歌手模型参数...") rvc_models(zip_path) print("4.开始推理用户上传的歌曲...") audio, sr = librosa.load(upload_audio, sr=16000, mono=True) song_infer = vc.pipeline( hubert_model, net_g, 0, audio, "", [0, 0, 0], f0_up_key, "rmvpe", index_files[0], 0.7, 1, 3, tgt_sr, 0, 0.25, version, 0.33, f0_file=None, ) sf.write("AI" + ''.join(os.listdir(f"{zip_path}")).strip() + "翻唱歌曲.wav", song_infer, tgt_sr) return "AI" + ''.join(os.listdir(f"{zip_path}")).strip() + "翻唱歌曲.wav" def rvc_infer_music(url, model_name, song_name, upload_audio, split_model, f0_up_key, vocal_volume, inst_volume): url = url.strip().replace(" ", "") model_name = model_name.strip().replace(" ", "") if url.startswith('https://download.openxlab.org.cn/models/'): zip_path = get_username(url) + "-" + get_file_name(url) elif url.startswith('https://huggingface.co/'): zip_path = get_username_hf(url) + "-" + get_file_name(url) else: zip_path = get_file_name(url) global singers if model_name not in singers: singers = singers+ ' '+ model_name print("1.开始下载AI歌手模型...") download_online_model(url, model_name) if upload_audio is None: video_identifier = search_bilibili(song_name.strip()) song_name = song_name.strip().replace(" ", "") song_id = get_bilibili_video_id(video_identifier) print(video_identifier) video_info = get_video_info(video_identifier) print(video_info) audio_content = get_response(video_info).content print("2.开始下载AI翻唱歌曲...") with open(song_id.strip() + ".wav", mode="wb") as f: f.write(audio_content) output_full_song = rvc_infer_music_gpu(zip_path, song_name, song_id, split_model, f0_up_key, vocal_volume, inst_volume) return output_full_song, singers else: song_duration = get_duration_wave(upload_audio) if song_duration < 480: print(f"上传歌曲时长:{song_duration}秒") output_full_song = rvc_infer_upload_audio_gpu(zip_path, upload_audio, split_model, f0_up_key, vocal_volume, inst_volume) else: raise Exception('抱歉!您上传的歌曲时长超过了8分钟,请上传短于8分钟的歌曲。') return output_full_song, singers app = gr.Blocks(theme="JohnSmith9982/small_and_pretty") with app: with gr.Tab("中文版"): gr.Markdown("#