import os,shutil,sys,pdb,re version="v2"#if sys.argv[-1]=="v2" else"v1" os.environ["version"]=version now_dir = os.getcwd() sys.path.insert(0, now_dir) import json,yaml,warnings,torch import platform import psutil import signal warnings.filterwarnings("ignore") torch.manual_seed(233333) tmp = os.path.join(now_dir, "TEMP") os.makedirs(tmp, exist_ok=True) os.environ["TEMP"] = tmp if(os.path.exists(tmp)): for name in os.listdir(tmp): if(name=="jieba.cache"):continue path="%s/%s"%(tmp,name) delete=os.remove if os.path.isfile(path) else shutil.rmtree try: delete(path) except Exception as e: print(str(e)) pass import site site_packages_roots = [] for path in site.getsitepackages(): if "packages" in path: site_packages_roots.append(path) if(site_packages_roots==[]):site_packages_roots=["%s/runtime/Lib/site-packages" % now_dir] #os.environ["OPENBLAS_NUM_THREADS"] = "4" os.environ["no_proxy"] = "localhost, 127.0.0.1, ::1" os.environ["all_proxy"] = "" for site_packages_root in site_packages_roots: if os.path.exists(site_packages_root): try: with open("%s/users.pth" % (site_packages_root), "w") as f: f.write( "%s\n%s/tools\n%s/tools/damo_asr\n%s/GPT_SoVITS\n%s/tools/uvr5" % (now_dir, now_dir, now_dir, now_dir, now_dir) ) break except PermissionError: pass from tools import my_utils import traceback import shutil import pdb import gradio as gr from subprocess import Popen import signal from config import python_exec,infer_device,is_half,exp_root,webui_port_main,webui_port_infer_tts,webui_port_uvr5,webui_port_subfix,is_share from tools.i18n.i18n import I18nAuto i18n = I18nAuto() from scipy.io import wavfile from tools.my_utils import load_audio from multiprocessing import cpu_count # os.environ['PYTORCH_ENABLE_MPS_FALLBACK'] = '1' # 当遇到mps不支持的步骤时使用cpu n_cpu=cpu_count() ngpu = torch.cuda.device_count() gpu_infos = [] mem = [] if_gpu_ok = False # 判断是否有能用来训练和加速推理的N卡 ok_gpu_keywords={"10","16","20","30","40","A2","A3","A4","P4","A50","500","A60","70","80","90","M4","T4","TITAN","L4","4060","H"} set_gpu_numbers=set() if torch.cuda.is_available() or ngpu != 0: for i in range(ngpu): gpu_name = torch.cuda.get_device_name(i) if any(value in gpu_name.upper()for value in ok_gpu_keywords): # A10#A100#V100#A40#P40#M40#K80#A4500 if_gpu_ok = True # 至少有一张能用的N卡 gpu_infos.append("%s\t%s" % (i, gpu_name)) set_gpu_numbers.add(i) mem.append(int(torch.cuda.get_device_properties(i).total_memory/ 1024/ 1024/ 1024+ 0.4)) # # 判断是否支持mps加速 # if torch.backends.mps.is_available(): # if_gpu_ok = True # gpu_infos.append("%s\t%s" % ("0", "Apple GPU")) # mem.append(psutil.virtual_memory().total/ 1024 / 1024 / 1024) # 实测使用系统内存作为显存不会爆显存 if if_gpu_ok and len(gpu_infos) > 0: gpu_info = "\n".join(gpu_infos) default_batch_size = min(mem) // 2 else: gpu_info = ("%s\t%s" % ("0", "CPU")) gpu_infos.append("%s\t%s" % ("0", "CPU")) set_gpu_numbers.add(0) default_batch_size = int(psutil.virtual_memory().total/ 1024 / 1024 / 1024 / 2) gpus = "-".join([i[0] for i in gpu_infos]) default_gpu_numbers=str(sorted(list(set_gpu_numbers))[0]) def fix_gpu_number(input):#将越界的number强制改到界内 try: if(int(input)not in set_gpu_numbers):return default_gpu_numbers except:return input return input def fix_gpu_numbers(inputs): output=[] try: for input in inputs.split(","):output.append(str(fix_gpu_number(input))) return ",".join(output) except: return inputs pretrained_sovits_name="GPT_SoVITS/pretrained_models/s2G2333k.pth" pretrained_gpt_name="GPT_SoVITS/pretrained_models/s1bert25hz-5kh-longer-epoch=12-step=369668.ckpt" def get_weights_names(): SoVITS_names = [pretrained_sovits_name] for name in os.listdir(SoVITS_weight_root): if name.endswith(".pth"):SoVITS_names.append(name) GPT_names = [pretrained_gpt_name] for name in os.listdir(GPT_weight_root): if name.endswith(".ckpt"): GPT_names.append(name) return SoVITS_names,GPT_names SoVITS_weight_root="SoVITS_weights" GPT_weight_root="GPT_weights" os.makedirs(SoVITS_weight_root,exist_ok=True) os.makedirs(GPT_weight_root,exist_ok=True) SoVITS_names,GPT_names = get_weights_names() def custom_sort_key(s): # 使用正则表达式提取字符串中的数字部分和非数字部分 parts = re.split('(\d+)', s) # 将数字部分转换为整数,非数字部分保持不变 parts = [int(part) if part.isdigit() else part for part in parts] return parts def change_choices(): SoVITS_names, GPT_names = get_weights_names() return {"choices": sorted(SoVITS_names,key=custom_sort_key), "__type__": "update"}, {"choices": sorted(GPT_names,key=custom_sort_key), "__type__": "update"} p_label=None p_uvr5=None p_asr=None p_denoise=None p_tts_inference=None def kill_proc_tree(pid, including_parent=True): try: parent = psutil.Process(pid) except psutil.NoSuchProcess: # Process already terminated return children = parent.children(recursive=True) for child in children: try: os.kill(child.pid, signal.SIGTERM) # or signal.SIGKILL except OSError: pass if including_parent: try: os.kill(parent.pid, signal.SIGTERM) # or signal.SIGKILL except OSError: pass system=platform.system() def kill_process(pid): if(system=="Windows"): cmd = "taskkill /t /f /pid %s" % pid os.system(cmd) else: kill_proc_tree(pid) def change_label(if_label,path_list): global p_label if(if_label==True and p_label==None): path_list=my_utils.clean_path(path_list) cmd = '"%s" tools/subfix_webui.py --load_list "%s" --webui_port %s --is_share %s'%(python_exec,path_list,webui_port_subfix,is_share) yield i18n("打标工具WebUI已开启") print(cmd) p_label = Popen(cmd, shell=True) elif(if_label==False and p_label!=None): kill_process(p_label.pid) p_label=None yield i18n("打标工具WebUI已关闭") def change_uvr5(if_uvr5): global p_uvr5 if(if_uvr5==True and p_uvr5==None): cmd = '"%s" tools/uvr5/webui.py "%s" %s %s %s'%(python_exec,infer_device,is_half,webui_port_uvr5,is_share) yield i18n("UVR5已开启") print(cmd) p_uvr5 = Popen(cmd, shell=True) elif(if_uvr5==False and p_uvr5!=None): kill_process(p_uvr5.pid) p_uvr5=None yield i18n("UVR5已关闭") def change_tts_inference(if_tts,bert_path,cnhubert_base_path,gpu_number,gpt_path,sovits_path): global p_tts_inference if(if_tts==True and p_tts_inference==None): os.environ["gpt_path"]=gpt_path if "/" in gpt_path else "%s/%s"%(GPT_weight_root,gpt_path) os.environ["sovits_path"]=sovits_path if "/"in sovits_path else "%s/%s"%(SoVITS_weight_root,sovits_path) os.environ["cnhubert_base_path"]=cnhubert_base_path os.environ["bert_path"]=bert_path os.environ["_CUDA_VISIBLE_DEVICES"]=fix_gpu_number(gpu_number) os.environ["is_half"]=str(is_half) os.environ["infer_ttswebui"]=str(webui_port_infer_tts) os.environ["is_share"]=str(is_share) cmd = '"%s" GPT_SoVITS/inference_webui.py'%(python_exec) yield i18n("TTS推理进程已开启") print(cmd) p_tts_inference = Popen(cmd, shell=True) elif(if_tts==False and p_tts_inference!=None): kill_process(p_tts_inference.pid) p_tts_inference=None yield i18n("TTS推理进程已关闭") from tools.asr.config import asr_dict def open_asr(asr_inp_dir, asr_opt_dir, asr_model, asr_model_size, asr_lang, asr_precision): global p_asr if(p_asr==None): asr_inp_dir=my_utils.clean_path(asr_inp_dir) asr_opt_dir=my_utils.clean_path(asr_opt_dir) cmd = f'"{python_exec}" tools/asr/{asr_dict[asr_model]["path"]}' cmd += f' -i "{asr_inp_dir}"' cmd += f' -o "{asr_opt_dir}"' cmd += f' -s {asr_model_size}' cmd += f' -l {asr_lang}' cmd += f" -p {asr_precision}" output_file_name = os.path.basename(asr_inp_dir) output_folder = asr_opt_dir or "output/asr_opt" output_file_path = os.path.abspath(f'{output_folder}/{output_file_name}.list') yield "ASR任务开启:%s"%cmd, {"__type__":"update","visible":False}, {"__type__":"update","visible":True}, {"__type__":"update"} print(cmd) p_asr = Popen(cmd, shell=True) p_asr.wait() p_asr=None yield f"ASR任务完成, 查看终端进行下一步", {"__type__":"update","visible":True}, {"__type__":"update","visible":False}, {"__type__":"update","value":output_file_path} else: yield "已有正在进行的ASR任务,需先终止才能开启下一次任务", {"__type__":"update","visible":False}, {"__type__":"update","visible":True}, {"__type__":"update"} # return None def close_asr(): global p_asr if(p_asr!=None): kill_process(p_asr.pid) p_asr=None return "已终止ASR进程", {"__type__":"update","visible":True}, {"__type__":"update","visible":False} def open_denoise(denoise_inp_dir, denoise_opt_dir): global p_denoise if(p_denoise==None): denoise_inp_dir=my_utils.clean_path(denoise_inp_dir) denoise_opt_dir=my_utils.clean_path(denoise_opt_dir) cmd = '"%s" tools/cmd-denoise.py -i "%s" -o "%s" -p %s'%(python_exec,denoise_inp_dir,denoise_opt_dir,"float16"if is_half==True else "float32") yield "语音降噪任务开启:%s"%cmd, {"__type__":"update","visible":False}, {"__type__":"update","visible":True}, {"__type__":"update"} print(cmd) p_denoise = Popen(cmd, shell=True) p_denoise.wait() p_denoise=None yield f"语音降噪任务完成, 查看终端进行下一步", {"__type__":"update","visible":True}, {"__type__":"update","visible":False}, {"__type__":"update","value":denoise_opt_dir} else: yield "已有正在进行的语音降噪任务,需先终止才能开启下一次任务", {"__type__":"update","visible":False}, {"__type__":"update","visible":True}, {"__type__":"update"} # return None def close_denoise(): global p_denoise if(p_denoise!=None): kill_process(p_denoise.pid) p_denoise=None return "已终止语音降噪进程", {"__type__":"update","visible":True}, {"__type__":"update","visible":False} p_train_SoVITS=None def open1Ba(batch_size,total_epoch,exp_name,text_low_lr_rate,if_save_latest,if_save_every_weights,save_every_epoch,gpu_numbers1Ba,pretrained_s2G,pretrained_s2D): global p_train_SoVITS if(p_train_SoVITS==None): with open("GPT_SoVITS/configs/s2.json")as f: data=f.read() data=json.loads(data) s2_dir="%s/%s"%(exp_root,exp_name) os.makedirs("%s/logs_s2"%(s2_dir),exist_ok=True) if(is_half==False): data["train"]["fp16_run"]=False batch_size=max(1,batch_size//2) data["train"]["batch_size"]=batch_size data["train"]["epochs"]=total_epoch data["train"]["text_low_lr_rate"]=text_low_lr_rate data["train"]["pretrained_s2G"]=pretrained_s2G data["train"]["pretrained_s2D"]=pretrained_s2D data["train"]["if_save_latest"]=if_save_latest data["train"]["if_save_every_weights"]=if_save_every_weights data["train"]["save_every_epoch"]=save_every_epoch data["train"]["gpu_numbers"]=gpu_numbers1Ba data["data"]["exp_dir"]=data["s2_ckpt_dir"]=s2_dir data["save_weight_dir"]=SoVITS_weight_root data["name"]=exp_name tmp_config_path="%s/tmp_s2.json"%tmp with open(tmp_config_path,"w")as f:f.write(json.dumps(data)) cmd = '"%s" GPT_SoVITS/s2_train.py --config "%s"'%(python_exec,tmp_config_path) yield "SoVITS训练开始:%s"%cmd, {"__type__":"update","visible":False}, {"__type__":"update","visible":True} print(cmd) p_train_SoVITS = Popen(cmd, shell=True) p_train_SoVITS.wait() p_train_SoVITS=None yield "SoVITS训练完成", {"__type__":"update","visible":True}, {"__type__":"update","visible":False} else: yield "已有正在进行的SoVITS训练任务,需先终止才能开启下一次任务", {"__type__":"update","visible":False}, {"__type__":"update","visible":True} def close1Ba(): global p_train_SoVITS if(p_train_SoVITS!=None): kill_process(p_train_SoVITS.pid) p_train_SoVITS=None return "已终止SoVITS训练", {"__type__":"update","visible":True}, {"__type__":"update","visible":False} p_train_GPT=None def open1Bb(batch_size,total_epoch,exp_name,if_dpo,if_save_latest,if_save_every_weights,save_every_epoch,gpu_numbers,pretrained_s1): global p_train_GPT if(p_train_GPT==None): with open("GPT_SoVITS/configs/s1longer.yaml"if version=="v1"else "GPT_SoVITS/configs/s1longer-v2.yaml")as f: data=f.read() data=yaml.load(data, Loader=yaml.FullLoader) s1_dir="%s/%s"%(exp_root,exp_name) os.makedirs("%s/logs_s1"%(s1_dir),exist_ok=True) if(is_half==False): data["train"]["precision"]="32" batch_size = max(1, batch_size // 2) data["train"]["batch_size"]=batch_size data["train"]["epochs"]=total_epoch data["pretrained_s1"]=pretrained_s1 data["train"]["save_every_n_epoch"]=save_every_epoch data["train"]["if_save_every_weights"]=if_save_every_weights data["train"]["if_save_latest"]=if_save_latest data["train"]["if_dpo"]=if_dpo data["train"]["half_weights_save_dir"]=GPT_weight_root data["train"]["exp_name"]=exp_name data["train_semantic_path"]="%s/6-name2semantic.tsv"%s1_dir data["train_phoneme_path"]="%s/2-name2text.txt"%s1_dir data["output_dir"]="%s/logs_s1"%s1_dir os.environ["_CUDA_VISIBLE_DEVICES"]=fix_gpu_numbers(gpu_numbers.replace("-",",")) os.environ["hz"]="25hz" tmp_config_path="%s/tmp_s1.yaml"%tmp with open(tmp_config_path, "w") as f:f.write(yaml.dump(data, default_flow_style=False)) # cmd = '"%s" GPT_SoVITS/s1_train.py --config_file "%s" --train_semantic_path "%s/6-name2semantic.tsv" --train_phoneme_path "%s/2-name2text.txt" --output_dir "%s/logs_s1"'%(python_exec,tmp_config_path,s1_dir,s1_dir,s1_dir) cmd = '"%s" GPT_SoVITS/s1_train.py --config_file "%s" '%(python_exec,tmp_config_path) yield "GPT训练开始:%s"%cmd, {"__type__":"update","visible":False}, {"__type__":"update","visible":True} print(cmd) p_train_GPT = Popen(cmd, shell=True) p_train_GPT.wait() p_train_GPT=None yield "GPT训练完成", {"__type__":"update","visible":True}, {"__type__":"update","visible":False} else: yield "已有正在进行的GPT训练任务,需先终止才能开启下一次任务", {"__type__":"update","visible":False}, {"__type__":"update","visible":True} def close1Bb(): global p_train_GPT if(p_train_GPT!=None): kill_process(p_train_GPT.pid) p_train_GPT=None return "已终止GPT训练", {"__type__":"update","visible":True}, {"__type__":"update","visible":False} ps_slice=[] def open_slice(inp,opt_root,threshold,min_length,min_interval,hop_size,max_sil_kept,_max,alpha,n_parts): global ps_slice inp = my_utils.clean_path(inp) opt_root = my_utils.clean_path(opt_root) if(os.path.exists(inp)==False): yield "输入路径不存在", {"__type__":"update","visible":True}, {"__type__":"update","visible":False}, {"__type__": "update"}, {"__type__": "update"} return if os.path.isfile(inp):n_parts=1 elif os.path.isdir(inp):pass else: yield "输入路径存在但既不是文件也不是文件夹", {"__type__":"update","visible":True}, {"__type__":"update","visible":False}, {"__type__": "update"}, {"__type__": "update"} return if (ps_slice == []): for i_part in range(n_parts): cmd = '"%s" tools/slice_audio.py "%s" "%s" %s %s %s %s %s %s %s %s %s''' % (python_exec,inp, opt_root, threshold, min_length, min_interval, hop_size, max_sil_kept, _max, alpha, i_part, n_parts) print(cmd) p = Popen(cmd, shell=True) ps_slice.append(p) yield "切割执行中", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}, {"__type__": "update"}, {"__type__": "update"} for p in ps_slice: p.wait() ps_slice=[] yield "切割结束", {"__type__":"update","visible":True}, {"__type__":"update","visible":False}, {"__type__": "update", "value":opt_root}, {"__type__": "update", "value":opt_root} else: yield "已有正在进行的切割任务,需先终止才能开启下一次任务", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}, {"__type__": "update"}, {"__type__": "update"} def close_slice(): global ps_slice if (ps_slice != []): for p_slice in ps_slice: try: kill_process(p_slice.pid) except: traceback.print_exc() ps_slice=[] return "已终止所有切割进程", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} ps1a=[] def open1a(inp_text,inp_wav_dir,exp_name,gpu_numbers,bert_pretrained_dir): global ps1a inp_text = my_utils.clean_path(inp_text) inp_wav_dir = my_utils.clean_path(inp_wav_dir) if (ps1a == []): opt_dir="%s/%s"%(exp_root,exp_name) config={ "inp_text":inp_text, "inp_wav_dir":inp_wav_dir, "exp_name":exp_name, "opt_dir":opt_dir, "bert_pretrained_dir":bert_pretrained_dir, } gpu_names=gpu_numbers.split("-") all_parts=len(gpu_names) for i_part in range(all_parts): config.update( { "i_part": str(i_part), "all_parts": str(all_parts), "_CUDA_VISIBLE_DEVICES": fix_gpu_number(gpu_names[i_part]), "is_half": str(is_half) } ) os.environ.update(config) cmd = '"%s" GPT_SoVITS/prepare_datasets/1-get-text.py'%python_exec print(cmd) p = Popen(cmd, shell=True) ps1a.append(p) yield "文本进程执行中", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} for p in ps1a: p.wait() opt = [] for i_part in range(all_parts): txt_path = "%s/2-name2text-%s.txt" % (opt_dir, i_part) with open(txt_path, "r", encoding="utf8") as f: opt += f.read().strip("\n").split("\n") os.remove(txt_path) path_text = "%s/2-name2text.txt" % opt_dir with open(path_text, "w", encoding="utf8") as f: f.write("\n".join(opt) + "\n") ps1a=[] if len("".join(opt)) > 0: yield "文本进程成功", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} else: yield "文本进程失败", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} else: yield "已有正在进行的文本任务,需先终止才能开启下一次任务", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} def close1a(): global ps1a if (ps1a != []): for p1a in ps1a: try: kill_process(p1a.pid) except: traceback.print_exc() ps1a=[] return "已终止所有1a进程", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} ps1b=[] def open1b(inp_text,inp_wav_dir,exp_name,gpu_numbers,ssl_pretrained_dir): global ps1b inp_text = my_utils.clean_path(inp_text) inp_wav_dir = my_utils.clean_path(inp_wav_dir) if (ps1b == []): config={ "inp_text":inp_text, "inp_wav_dir":inp_wav_dir, "exp_name":exp_name, "opt_dir":"%s/%s"%(exp_root,exp_name), "cnhubert_base_dir":ssl_pretrained_dir, "is_half": str(is_half) } gpu_names=gpu_numbers.split("-") all_parts=len(gpu_names) for i_part in range(all_parts): config.update( { "i_part": str(i_part), "all_parts": str(all_parts), "_CUDA_VISIBLE_DEVICES": fix_gpu_number(gpu_names[i_part]), } ) os.environ.update(config) cmd = '"%s" GPT_SoVITS/prepare_datasets/2-get-hubert-wav32k.py'%python_exec print(cmd) p = Popen(cmd, shell=True) ps1b.append(p) yield "SSL提取进程执行中", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} for p in ps1b: p.wait() ps1b=[] yield "SSL提取进程结束", {"__type__":"update","visible":True}, {"__type__":"update","visible":False} else: yield "已有正在进行的SSL提取任务,需先终止才能开启下一次任务", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} def close1b(): global ps1b if (ps1b != []): for p1b in ps1b: try: kill_process(p1b.pid) except: traceback.print_exc() ps1b=[] return "已终止所有1b进程", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} ps1c=[] def open1c(inp_text,exp_name,gpu_numbers,pretrained_s2G_path): global ps1c inp_text = my_utils.clean_path(inp_text) if (ps1c == []): opt_dir="%s/%s"%(exp_root,exp_name) config={ "inp_text":inp_text, "exp_name":exp_name, "opt_dir":opt_dir, "pretrained_s2G":pretrained_s2G_path, "s2config_path":"GPT_SoVITS/configs/s2.json", "is_half": str(is_half) } gpu_names=gpu_numbers.split("-") all_parts=len(gpu_names) for i_part in range(all_parts): config.update( { "i_part": str(i_part), "all_parts": str(all_parts), "_CUDA_VISIBLE_DEVICES": fix_gpu_number(gpu_names[i_part]), } ) os.environ.update(config) cmd = '"%s" GPT_SoVITS/prepare_datasets/3-get-semantic.py'%python_exec print(cmd) p = Popen(cmd, shell=True) ps1c.append(p) yield "语义token提取进程执行中", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} for p in ps1c: p.wait() opt = ["item_name\tsemantic_audio"] path_semantic = "%s/6-name2semantic.tsv" % opt_dir for i_part in range(all_parts): semantic_path = "%s/6-name2semantic-%s.tsv" % (opt_dir, i_part) with open(semantic_path, "r", encoding="utf8") as f: opt += f.read().strip("\n").split("\n") os.remove(semantic_path) with open(path_semantic, "w", encoding="utf8") as f: f.write("\n".join(opt) + "\n") ps1c=[] yield "语义token提取进程结束", {"__type__":"update","visible":True}, {"__type__":"update","visible":False} else: yield "已有正在进行的语义token提取任务,需先终止才能开启下一次任务", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} def close1c(): global ps1c if (ps1c != []): for p1c in ps1c: try: kill_process(p1c.pid) except: traceback.print_exc() ps1c=[] return "已终止所有语义token进程", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} #####inp_text,inp_wav_dir,exp_name,gpu_numbers1a,gpu_numbers1Ba,gpu_numbers1c,bert_pretrained_dir,cnhubert_base_dir,pretrained_s2G ps1abc=[] def open1abc(inp_text,inp_wav_dir,exp_name,gpu_numbers1a,gpu_numbers1Ba,gpu_numbers1c,bert_pretrained_dir,ssl_pretrained_dir,pretrained_s2G_path): global ps1abc inp_text = my_utils.clean_path(inp_text) inp_wav_dir = my_utils.clean_path(inp_wav_dir) if (ps1abc == []): opt_dir="%s/%s"%(exp_root,exp_name) try: #############################1a path_text="%s/2-name2text.txt" % opt_dir if(os.path.exists(path_text)==False or (os.path.exists(path_text)==True and len(open(path_text,"r",encoding="utf8").read().strip("\n").split("\n"))<2)): config={ "inp_text":inp_text, "inp_wav_dir":inp_wav_dir, "exp_name":exp_name, "opt_dir":opt_dir, "bert_pretrained_dir":bert_pretrained_dir, "is_half": str(is_half) } gpu_names=gpu_numbers1a.split("-") all_parts=len(gpu_names) for i_part in range(all_parts): config.update( { "i_part": str(i_part), "all_parts": str(all_parts), "_CUDA_VISIBLE_DEVICES": fix_gpu_number(gpu_names[i_part]), } ) os.environ.update(config) cmd = '"%s" GPT_SoVITS/prepare_datasets/1-get-text.py'%python_exec print(cmd) p = Popen(cmd, shell=True) ps1abc.append(p) yield "进度:1a-ing", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} for p in ps1abc:p.wait() opt = [] for i_part in range(all_parts):#txt_path="%s/2-name2text-%s.txt"%(opt_dir,i_part) txt_path = "%s/2-name2text-%s.txt" % (opt_dir, i_part) with open(txt_path, "r",encoding="utf8") as f: opt += f.read().strip("\n").split("\n") os.remove(txt_path) with open(path_text, "w",encoding="utf8") as f: f.write("\n".join(opt) + "\n") assert len("".join(opt)) > 0, "1Aa-文本获取进程失败" yield "进度:1a-done", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} ps1abc=[] #############################1b config={ "inp_text":inp_text, "inp_wav_dir":inp_wav_dir, "exp_name":exp_name, "opt_dir":opt_dir, "cnhubert_base_dir":ssl_pretrained_dir, } gpu_names=gpu_numbers1Ba.split("-") all_parts=len(gpu_names) for i_part in range(all_parts): config.update( { "i_part": str(i_part), "all_parts": str(all_parts), "_CUDA_VISIBLE_DEVICES": fix_gpu_number(gpu_names[i_part]), } ) os.environ.update(config) cmd = '"%s" GPT_SoVITS/prepare_datasets/2-get-hubert-wav32k.py'%python_exec print(cmd) p = Popen(cmd, shell=True) ps1abc.append(p) yield "进度:1a-done, 1b-ing", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} for p in ps1abc:p.wait() yield "进度:1a1b-done", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} ps1abc=[] #############################1c path_semantic = "%s/6-name2semantic.tsv" % opt_dir if(os.path.exists(path_semantic)==False or (os.path.exists(path_semantic)==True and os.path.getsize(path_semantic)<31)): config={ "inp_text":inp_text, "exp_name":exp_name, "opt_dir":opt_dir, "pretrained_s2G":pretrained_s2G_path, "s2config_path":"GPT_SoVITS/configs/s2.json", } gpu_names=gpu_numbers1c.split("-") all_parts=len(gpu_names) for i_part in range(all_parts): config.update( { "i_part": str(i_part), "all_parts": str(all_parts), "_CUDA_VISIBLE_DEVICES": fix_gpu_number(gpu_names[i_part]), } ) os.environ.update(config) cmd = '"%s" GPT_SoVITS/prepare_datasets/3-get-semantic.py'%python_exec print(cmd) p = Popen(cmd, shell=True) ps1abc.append(p) yield "进度:1a1b-done, 1cing", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} for p in ps1abc:p.wait() opt = ["item_name\tsemantic_audio"] for i_part in range(all_parts): semantic_path = "%s/6-name2semantic-%s.tsv" % (opt_dir, i_part) with open(semantic_path, "r",encoding="utf8") as f: opt += f.read().strip("\n").split("\n") os.remove(semantic_path) with open(path_semantic, "w",encoding="utf8") as f: f.write("\n".join(opt) + "\n") yield "进度:all-done", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} ps1abc = [] yield "一键三连进程结束", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} except: traceback.print_exc() close1abc() yield "一键三连中途报错", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} else: yield "已有正在进行的一键三连任务,需先终止才能开启下一次任务", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} def close1abc(): global ps1abc if (ps1abc != []): for p1abc in ps1abc: try: kill_process(p1abc.pid) except: traceback.print_exc() ps1abc=[] return "已终止所有一键三连进程", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} with gr.Blocks(title="GPT-SoVITS WebUI") as app: gr.Markdown( value= i18n("本软件以MIT协议开源, 作者不对软件具备任何控制力, 使用软件者、传播软件导出的声音者自负全责.
如不认可该条款, 则不能使用或引用软件包内任何代码和文件. 详见根目录LICENSE.") ) gr.Markdown( value= i18n("中文教程文档:https://www.yuque.com/baicaigongchang1145haoyuangong/ib3g1e") ) with gr.Tabs(): with gr.TabItem(i18n("0-前置数据集获取工具")):#提前随机切片防止uvr5爆内存->uvr5->slicer->asr->打标 gr.Markdown(value=i18n("0a-UVR5人声伴奏分离&去混响去延迟工具")) with gr.Row(): if_uvr5 = gr.Checkbox(label=i18n("是否开启UVR5-WebUI"),show_label=True) uvr5_info = gr.Textbox(label=i18n("UVR5进程输出信息")) gr.Markdown(value=i18n("0b-语音切分工具")) with gr.Row(): with gr.Column(scale=3): with gr.Row(): slice_inp_path=gr.Textbox(label=i18n("音频自动切分输入路径,可文件可文件夹"),value="") slice_opt_root=gr.Textbox(label=i18n("切分后的子音频的输出根目录"),value="output/slicer_opt") with gr.Row(): threshold=gr.Textbox(label=i18n("threshold:音量小于这个值视作静音的备选切割点"),value="-34") min_length=gr.Textbox(label=i18n("min_length:每段最小多长,如果第一段太短一直和后面段连起来直到超过这个值"),value="4000") min_interval=gr.Textbox(label=i18n("min_interval:最短切割间隔"),value="300") hop_size=gr.Textbox(label=i18n("hop_size:怎么算音量曲线,越小精度越大计算量越高(不是精度越大效果越好)"),value="10") max_sil_kept=gr.Textbox(label=i18n("max_sil_kept:切完后静音最多留多长"),value="500") with gr.Row(): _max=gr.Slider(minimum=0,maximum=1,step=0.05,label=i18n("max:归一化后最大值多少"),value=0.9,interactive=True) alpha=gr.Slider(minimum=0,maximum=1,step=0.05,label=i18n("alpha_mix:混多少比例归一化后音频进来"),value=0.25,interactive=True) n_process=gr.Slider(minimum=1,maximum=n_cpu,step=1,label=i18n("切割使用的进程数"),value=4,interactive=True) with gr.Row(): slicer_info = gr.Textbox(label=i18n("语音切割进程输出信息")) open_slicer_button=gr.Button(i18n("开启语音切割"), variant="primary",visible=True) close_slicer_button=gr.Button(i18n("终止语音切割"), variant="primary",visible=False) gr.Markdown(value=i18n("0bb-语音降噪工具")) with gr.Row(): with gr.Column(scale=3): with gr.Row(): denoise_input_dir=gr.Textbox(label=i18n("降噪音频文件输入文件夹"),value="") denoise_output_dir=gr.Textbox(label=i18n("降噪结果输出文件夹"),value="output/denoise_opt") with gr.Row(): denoise_info = gr.Textbox(label=i18n("语音降噪进程输出信息")) open_denoise_button = gr.Button(i18n("开启语音降噪"), variant="primary",visible=True) close_denoise_button = gr.Button(i18n("终止语音降噪进程"), variant="primary",visible=False) gr.Markdown(value=i18n("0c-中文批量离线ASR工具")) with gr.Row(): with gr.Column(scale=3): with gr.Row(): asr_inp_dir = gr.Textbox( label=i18n("输入文件夹路径"), value="D:\\GPT-SoVITS\\raw\\xxx", interactive=True, ) asr_opt_dir = gr.Textbox( label = i18n("输出文件夹路径"), value = "output/asr_opt", interactive = True, ) with gr.Row(): asr_model = gr.Dropdown( label = i18n("ASR 模型"), choices = list(asr_dict.keys()), interactive = True, value="达摩 ASR (中文)" ) asr_size = gr.Dropdown( label = i18n("ASR 模型尺寸"), choices = ["large"], interactive = True, value="large" ) asr_lang = gr.Dropdown( label = i18n("ASR 语言设置"), choices = ["zh"], interactive = True, value="zh" ) asr_precision = gr.Dropdown( label = i18n("数据类型精度"), choices = ["float32"], interactive = True, value="float32" ) with gr.Row(): asr_info = gr.Textbox(label=i18n("ASR进程输出信息")) open_asr_button = gr.Button(i18n("开启离线批量ASR"), variant="primary",visible=True) close_asr_button = gr.Button(i18n("终止ASR进程"), variant="primary",visible=False) def change_lang_choices(key): #根据选择的模型修改可选的语言 # return gr.Dropdown(choices=asr_dict[key]['lang']) return {"__type__": "update", "choices": asr_dict[key]['lang'],"value":asr_dict[key]['lang'][0]} def change_size_choices(key): # 根据选择的模型修改可选的模型尺寸 # return gr.Dropdown(choices=asr_dict[key]['size']) return {"__type__": "update", "choices": asr_dict[key]['size'],"value":asr_dict[key]['size'][-1]} def change_precision_choices(key): #根据选择的模型修改可选的语言 if key =="Faster Whisper (多语种)": if default_batch_size <= 4: precision = 'int8' elif is_half: precision = 'float16' else: precision = 'float32' else: precision = 'float32' # return gr.Dropdown(choices=asr_dict[key]['precision']) return {"__type__": "update", "choices": asr_dict[key]['precision'],"value":precision} asr_model.change(change_lang_choices, [asr_model], [asr_lang]) asr_model.change(change_size_choices, [asr_model], [asr_size]) asr_model.change(change_precision_choices, [asr_model], [asr_precision]) gr.Markdown(value=i18n("0d-语音文本校对标注工具")) with gr.Row(): if_label = gr.Checkbox(label=i18n("是否开启打标WebUI"),show_label=True) path_list = gr.Textbox( label=i18n(".list标注文件的路径"), value="D:\\RVC1006\\GPT-SoVITS\\raw\\xxx.list", interactive=True, ) label_info = gr.Textbox(label=i18n("打标工具进程输出信息")) if_label.change(change_label, [if_label,path_list], [label_info]) if_uvr5.change(change_uvr5, [if_uvr5], [uvr5_info]) open_asr_button.click(open_asr, [asr_inp_dir, asr_opt_dir, asr_model, asr_size, asr_lang, asr_precision], [asr_info,open_asr_button,close_asr_button,path_list]) close_asr_button.click(close_asr, [], [asr_info,open_asr_button,close_asr_button]) open_slicer_button.click(open_slice, [slice_inp_path,slice_opt_root,threshold,min_length,min_interval,hop_size,max_sil_kept,_max,alpha,n_process], [slicer_info,open_slicer_button,close_slicer_button,asr_inp_dir,denoise_input_dir]) close_slicer_button.click(close_slice, [], [slicer_info,open_slicer_button,close_slicer_button]) open_denoise_button.click(open_denoise, [denoise_input_dir,denoise_output_dir], [denoise_info,open_denoise_button,close_denoise_button,asr_inp_dir]) close_denoise_button.click(close_denoise, [], [denoise_info,open_denoise_button,close_denoise_button]) with gr.TabItem(i18n("1-GPT-SoVITS-TTS")): with gr.Row(): exp_name = gr.Textbox(label=i18n("*实验/模型名"), value="xxx", interactive=True) gpu_info = gr.Textbox(label=i18n("显卡信息"), value=gpu_info, visible=True, interactive=False) pretrained_s2G = gr.Textbox(label=i18n("预训练的SoVITS-G模型路径"), value=pretrained_sovits_name, interactive=True) pretrained_s2D = gr.Textbox(label=i18n("预训练的SoVITS-D模型路径"), value=pretrained_sovits_name.replace("s2G","s2D"), interactive=True) pretrained_s1 = gr.Textbox(label=i18n("预训练的GPT模型路径"), value=pretrained_gpt_name, interactive=True) with gr.TabItem(i18n("1A-训练集格式化工具")): gr.Markdown(value=i18n("输出logs/实验名目录下应有23456开头的文件和文件夹")) with gr.Row(): inp_text = gr.Textbox(label=i18n("*文本标注文件"),value=r"D:\RVC1006\GPT-SoVITS\raw\xxx.list",interactive=True) inp_wav_dir = gr.Textbox( label=i18n("*训练集音频文件目录"), # value=r"D:\RVC1006\GPT-SoVITS\raw\xxx", interactive=True, placeholder=i18n("填切割后音频所在目录!读取的音频文件完整路径=该目录-拼接-list文件里波形对应的文件名(不是全路径)。如果留空则使用.list文件里的绝对全路径。") ) gr.Markdown(value=i18n("1Aa-文本内容")) with gr.Row(): gpu_numbers1a = gr.Textbox(label=i18n("GPU卡号以-分割,每个卡号一个进程"),value="%s-%s"%(gpus,gpus),interactive=True) bert_pretrained_dir = gr.Textbox(label=i18n("预训练的中文BERT模型路径"),value="GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large",interactive=False) button1a_open = gr.Button(i18n("开启文本获取"), variant="primary",visible=True) button1a_close = gr.Button(i18n("终止文本获取进程"), variant="primary",visible=False) info1a=gr.Textbox(label=i18n("文本进程输出信息")) gr.Markdown(value=i18n("1Ab-SSL自监督特征提取")) with gr.Row(): gpu_numbers1Ba = gr.Textbox(label=i18n("GPU卡号以-分割,每个卡号一个进程"),value="%s-%s"%(gpus,gpus),interactive=True) cnhubert_base_dir = gr.Textbox(label=i18n("预训练的SSL模型路径"),value="GPT_SoVITS/pretrained_models/chinese-hubert-base",interactive=False) button1b_open = gr.Button(i18n("开启SSL提取"), variant="primary",visible=True) button1b_close = gr.Button(i18n("终止SSL提取进程"), variant="primary",visible=False) info1b=gr.Textbox(label=i18n("SSL进程输出信息")) gr.Markdown(value=i18n("1Ac-语义token提取")) with gr.Row(): gpu_numbers1c = gr.Textbox(label=i18n("GPU卡号以-分割,每个卡号一个进程"),value="%s-%s"%(gpus,gpus),interactive=True) button1c_open = gr.Button(i18n("开启语义token提取"), variant="primary",visible=True) button1c_close = gr.Button(i18n("终止语义token提取进程"), variant="primary",visible=False) info1c=gr.Textbox(label=i18n("语义token提取进程输出信息")) gr.Markdown(value=i18n("1Aabc-训练集格式化一键三连")) with gr.Row(): button1abc_open = gr.Button(i18n("开启一键三连"), variant="primary",visible=True) button1abc_close = gr.Button(i18n("终止一键三连"), variant="primary",visible=False) info1abc=gr.Textbox(label=i18n("一键三连进程输出信息")) button1a_open.click(open1a, [inp_text,inp_wav_dir,exp_name,gpu_numbers1a,bert_pretrained_dir], [info1a,button1a_open,button1a_close]) button1a_close.click(close1a, [], [info1a,button1a_open,button1a_close]) button1b_open.click(open1b, [inp_text,inp_wav_dir,exp_name,gpu_numbers1Ba,cnhubert_base_dir], [info1b,button1b_open,button1b_close]) button1b_close.click(close1b, [], [info1b,button1b_open,button1b_close]) button1c_open.click(open1c, [inp_text,exp_name,gpu_numbers1c,pretrained_s2G], [info1c,button1c_open,button1c_close]) button1c_close.click(close1c, [], [info1c,button1c_open,button1c_close]) button1abc_open.click(open1abc, [inp_text,inp_wav_dir,exp_name,gpu_numbers1a,gpu_numbers1Ba,gpu_numbers1c,bert_pretrained_dir,cnhubert_base_dir,pretrained_s2G], [info1abc,button1abc_open,button1abc_close]) button1abc_close.click(close1abc, [], [info1abc,button1abc_open,button1abc_close]) with gr.TabItem(i18n("1B-微调训练")): gr.Markdown(value=i18n("1Ba-SoVITS训练。用于分享的模型文件输出在SoVITS_weights下。")) with gr.Row(): batch_size = gr.Slider(minimum=1,maximum=40,step=1,label=i18n("每张显卡的batch_size"),value=default_batch_size,interactive=True) total_epoch = gr.Slider(minimum=1,maximum=25,step=1,label=i18n("总训练轮数total_epoch,不建议太高"),value=8,interactive=True) text_low_lr_rate = gr.Slider(minimum=0.2,maximum=0.6,step=0.05,label=i18n("文本模块学习率权重"),value=0.4,interactive=True) save_every_epoch = gr.Slider(minimum=1,maximum=25,step=1,label=i18n("保存频率save_every_epoch"),value=4,interactive=True) if_save_latest = gr.Checkbox(label=i18n("是否仅保存最新的ckpt文件以节省硬盘空间"), value=True, interactive=True, show_label=True) if_save_every_weights = gr.Checkbox(label=i18n("是否在每次保存时间点将最终小模型保存至weights文件夹"), value=True, interactive=True, show_label=True) gpu_numbers1Ba = gr.Textbox(label=i18n("GPU卡号以-分割,每个卡号一个进程"), value="%s" % (gpus), interactive=True) with gr.Row(): button1Ba_open = gr.Button(i18n("开启SoVITS训练"), variant="primary",visible=True) button1Ba_close = gr.Button(i18n("终止SoVITS训练"), variant="primary",visible=False) info1Ba=gr.Textbox(label=i18n("SoVITS训练进程输出信息")) gr.Markdown(value=i18n("1Bb-GPT训练。用于分享的模型文件输出在GPT_weights下。")) with gr.Row(): batch_size1Bb = gr.Slider(minimum=1,maximum=40,step=1,label=i18n("每张显卡的batch_size"),value=default_batch_size,interactive=True) total_epoch1Bb = gr.Slider(minimum=2,maximum=50,step=1,label=i18n("总训练轮数total_epoch"),value=15,interactive=True) if_dpo = gr.Checkbox(label=i18n("是否开启dpo训练选项(实验性)"), value=False, interactive=True, show_label=True) if_save_latest1Bb = gr.Checkbox(label=i18n("是否仅保存最新的ckpt文件以节省硬盘空间"), value=True, interactive=True, show_label=True) if_save_every_weights1Bb = gr.Checkbox(label=i18n("是否在每次保存时间点将最终小模型保存至weights文件夹"), value=True, interactive=True, show_label=True) save_every_epoch1Bb = gr.Slider(minimum=1,maximum=50,step=1,label=i18n("保存频率save_every_epoch"),value=5,interactive=True) gpu_numbers1Bb = gr.Textbox(label=i18n("GPU卡号以-分割,每个卡号一个进程"), value="%s" % (gpus), interactive=True) with gr.Row(): button1Bb_open = gr.Button(i18n("开启GPT训练"), variant="primary",visible=True) button1Bb_close = gr.Button(i18n("终止GPT训练"), variant="primary",visible=False) info1Bb=gr.Textbox(label=i18n("GPT训练进程输出信息")) button1Ba_open.click(open1Ba, [batch_size,total_epoch,exp_name,text_low_lr_rate,if_save_latest,if_save_every_weights,save_every_epoch,gpu_numbers1Ba,pretrained_s2G,pretrained_s2D], [info1Ba,button1Ba_open,button1Ba_close]) button1Ba_close.click(close1Ba, [], [info1Ba,button1Ba_open,button1Ba_close]) button1Bb_open.click(open1Bb, [batch_size1Bb,total_epoch1Bb,exp_name,if_dpo,if_save_latest1Bb,if_save_every_weights1Bb,save_every_epoch1Bb,gpu_numbers1Bb,pretrained_s1], [info1Bb,button1Bb_open,button1Bb_close]) button1Bb_close.click(close1Bb, [], [info1Bb,button1Bb_open,button1Bb_close]) with gr.TabItem(i18n("1C-推理")): gr.Markdown(value=i18n("选择训练完存放在SoVITS_weights和GPT_weights下的模型。默认的一个是底模,体验5秒Zero Shot TTS用。")) with gr.Row(): GPT_dropdown = gr.Dropdown(label=i18n("*GPT模型列表"), choices=sorted(GPT_names,key=custom_sort_key),value=pretrained_gpt_name,interactive=True) SoVITS_dropdown = gr.Dropdown(label=i18n("*SoVITS模型列表"), choices=sorted(SoVITS_names,key=custom_sort_key),value=pretrained_sovits_name,interactive=True) gpu_number_1C=gr.Textbox(label=i18n("GPU卡号,只能填1个整数"), value=gpus, interactive=True) refresh_button = gr.Button(i18n("刷新模型路径"), variant="primary") refresh_button.click(fn=change_choices,inputs=[],outputs=[SoVITS_dropdown,GPT_dropdown]) with gr.Row(): if_tts = gr.Checkbox(label=i18n("是否开启TTS推理WebUI"), show_label=True) tts_info = gr.Textbox(label=i18n("TTS推理WebUI进程输出信息")) if_tts.change(change_tts_inference, [if_tts,bert_pretrained_dir,cnhubert_base_dir,gpu_number_1C,GPT_dropdown,SoVITS_dropdown], [tts_info]) with gr.TabItem(i18n("2-GPT-SoVITS-变声")):gr.Markdown(value=i18n("施工中,请静候佳音")) # app.queue(concurrency_count=511, max_size=1022).launch( # server_name="0.0.0.0", # inbrowser=True, # share=is_share, # server_port=webui_port_main, # quiet=True, # ) # button1abc_open.click(open1abc, [inp_text,inp_wav_dir,exp_name,gpu_numbers1a,gpu_numbers1Ba,gpu_numbers1c,bert_pretrained_dir,cnhubert_base_dir,pretrained_s2G], [info1abc,button1abc_open,button1abc_close]) # open1abc(./*/transcript.txt, None, *, "0-1", "0-1", "0-1", "GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large", "GPT_SoVITS/pretrained_models/chinese-hubert-base", "GPT_SoVITS/pretrained_models/s2G2333k.pth") import os import sys # to avoid the modified user.pth file cnhubert_base_path = "GPT_SoVITS\pretrained_models\chinese-hubert-base" bert_path = "GPT_SoVITS\pretrained_models\chinese-roberta-wwm-ext-large" os.environ["version"] = 'v2' now_dir = os.getcwd() sys.path.insert(0, now_dir) import gradio as gr from transformers import AutoModelForMaskedLM, AutoTokenizer import numpy as np from pathlib import Path import os,librosa,torch from scipy.io.wavfile import write as wavwrite from GPT_SoVITS.feature_extractor import cnhubert cnhubert.cnhubert_base_path=cnhubert_base_path from GPT_SoVITS.module.models import SynthesizerTrn from GPT_SoVITS.AR.models.t2s_lightning_module import Text2SemanticLightningModule from GPT_SoVITS.text import cleaned_text_to_sequence from GPT_SoVITS.text.cleaner import clean_text import GPT_SoVITS.utils from time import time as ttime from GPT_SoVITS.module.mel_processing import spectrogram_torch import tempfile from tools.my_utils import load_audio import os import json ################ End strange import and user.pth modification ################ # import pyopenjtalk # cwd = os.getcwd() # if os.path.exists(os.path.join(cwd,'user.dic')): # pyopenjtalk.update_global_jtalk_with_user_dict(os.path.join(cwd, 'user.dic')) import logging logging.getLogger('httpx').setLevel(logging.WARNING) logging.getLogger('httpcore').setLevel(logging.WARNING) logging.getLogger('multipart').setLevel(logging.WARNING) device = "cuda" if torch.cuda.is_available() else "cpu" #device = "cpu" is_half = False tokenizer = AutoTokenizer.from_pretrained(bert_path) bert_model=AutoModelForMaskedLM.from_pretrained(bert_path) if(is_half==True):bert_model=bert_model.half().to(device) else:bert_model=bert_model.to(device) # bert_model=bert_model.to(device) def get_bert_feature(text, word2ph): # Bert(不是HuBERT的特征计算) with torch.no_grad(): inputs = tokenizer(text, return_tensors="pt") for i in inputs: inputs[i] = inputs[i].to(device)#####输入是long不用管精度问题,精度随bert_model res = bert_model(**inputs, output_hidden_states=True) res = torch.cat(res["hidden_states"][-3:-2], -1)[0].cpu()[1:-1] assert len(word2ph) == len(text) phone_level_feature = [] for i in range(len(word2ph)): repeat_feature = res[i].repeat(word2ph[i], 1) phone_level_feature.append(repeat_feature) phone_level_feature = torch.cat(phone_level_feature, dim=0) # if(is_half==True):phone_level_feature=phone_level_feature.half() return phone_level_feature.T loaded_sovits_model = [] # [(path, dict, model)] loaded_gpt_model = [] ssl_model = cnhubert.get_model() if (is_half == True): ssl_model = ssl_model.half().to(device) else: ssl_model = ssl_model.to(device) def load_model(sovits_path, gpt_path): global ssl_model global loaded_sovits_model global loaded_gpt_model vq_model = None t2s_model = None dict_s2 = None dict_s1 = None hps = None for path, dict_s2_, model in loaded_sovits_model: if path == sovits_path: vq_model = model dict_s2 = dict_s2_ break for path, dict_s1_, model in loaded_gpt_model: if path == gpt_path: t2s_model = model dict_s1 = dict_s1_ break if dict_s2 is None: dict_s2 = torch.load(sovits_path, map_location="cpu") hps = dict_s2["config"] if dict_s1 is None: dict_s1 = torch.load(gpt_path, map_location="cpu") config = dict_s1["config"] class DictToAttrRecursive: def __init__(self, input_dict): for key, value in input_dict.items(): if isinstance(value, dict): # 如果值是字典,递归调用构造函数 setattr(self, key, DictToAttrRecursive(value)) else: setattr(self, key, value) hps = DictToAttrRecursive(hps) hps.model.semantic_frame_rate = "25hz" if not vq_model: vq_model = SynthesizerTrn( hps.data.filter_length // 2 + 1, hps.train.segment_size // hps.data.hop_length, n_speakers=hps.data.n_speakers, **hps.model) if (is_half == True): vq_model = vq_model.half().to(device) else: vq_model = vq_model.to(device) vq_model.eval() vq_model.load_state_dict(dict_s2["weight"], strict=False) loaded_sovits_model.append((sovits_path, dict_s2, vq_model)) hz = 50 max_sec = config['data']['max_sec'] if not t2s_model: t2s_model = Text2SemanticLightningModule(config, "ojbk", is_train=False) t2s_model.load_state_dict(dict_s1["weight"]) if (is_half == True): t2s_model = t2s_model.half() t2s_model = t2s_model.to(device) t2s_model.eval() total = sum([param.nelement() for param in t2s_model.parameters()]) loaded_gpt_model.append((gpt_path, dict_s1, t2s_model)) return vq_model, ssl_model, t2s_model, hps, config, hz, max_sec def get_spepc(hps, filename): audio=load_audio(filename,int(hps.data.sampling_rate)) audio = audio / np.max(np.abs(audio)) audio=torch.FloatTensor(audio) audio_norm = audio # audio_norm = audio / torch.max(torch.abs(audio)) audio_norm = audio_norm.unsqueeze(0) spec = spectrogram_torch(audio_norm, hps.data.filter_length,hps.data.sampling_rate, hps.data.hop_length, hps.data.win_length,center=False) return spec def create_tts_fn(vq_model, ssl_model, t2s_model, hps, config, hz, max_sec): def tts_fn(ref_wav_path, prompt_text, prompt_language, target_phone, text_language, target_text = None): t0 = ttime() prompt_text=prompt_text.strip() prompt_language=prompt_language with torch.no_grad(): wav16k, sr = librosa.load(ref_wav_path, sr=16000, mono=False) direction = np.array([1,1]) if wav16k.ndim == 2: power = np.sum(np.abs(wav16k) ** 2, axis=1) direction = power / np.sum(power) wav16k = (wav16k[0] + wav16k[1]) / 2 # # maxx=0.95 # tmp_max = np.abs(wav16k).max() # alpha=0.5 # wav16k = (wav16k / tmp_max * (maxx * alpha*32768)) + ((1 - alpha)*32768) * wav16k #在这里归一化 #print(max(np.abs(wav16k))) #wav16k = wav16k / np.max(np.abs(wav16k)) #print(max(np.abs(wav16k))) # 添加0.3s的静音 wav16k = np.concatenate([wav16k, np.zeros(int(hps.data.sampling_rate * 0.3)),]) wav16k = torch.from_numpy(wav16k) wav16k = wav16k.float() if(is_half==True):wav16k=wav16k.half().to(device) else:wav16k=wav16k.to(device) ssl_content = ssl_model.model(wav16k.unsqueeze(0))["last_hidden_state"].transpose(1, 2)#.float() codes = vq_model.extract_latent(ssl_content) prompt_semantic = codes[0, 0] t1 = ttime() phones1, word2ph1, norm_text1 = clean_text(prompt_text, prompt_language) phones1=cleaned_text_to_sequence(phones1) #texts=text.split("\n") audio_opt = [] zero_wav=np.zeros((2, int(hps.data.sampling_rate*0.3)),dtype=np.float16 if is_half==True else np.float32) phones = get_phone_from_str_list(target_phone, text_language) for phones2 in phones: if(len(phones2) == 0): continue if(len(phones2) == 1 and phones2[0] == ""): continue #phones2, word2ph2, norm_text2 = clean_text(text, text_language) phones2 = cleaned_text_to_sequence(phones2) #if(prompt_language=="zh"):bert1 = get_bert_feature(norm_text1, word2ph1).to(device) bert1 = torch.zeros((1024, len(phones1)),dtype=torch.float16 if is_half==True else torch.float32).to(device) #if(text_language=="zh"):bert2 = get_bert_feature(norm_text2, word2ph2).to(device) bert2 = torch.zeros((1024, len(phones2))).to(bert1) bert = torch.cat([bert1, bert2], 1) all_phoneme_ids = torch.LongTensor(phones1+phones2).to(device).unsqueeze(0) bert = bert.to(device).unsqueeze(0) all_phoneme_len = torch.tensor([all_phoneme_ids.shape[-1]]).to(device) prompt = prompt_semantic.unsqueeze(0).to(device) t2 = ttime() idx = 0 cnt = 0 while idx == 0 and cnt < 2: with torch.no_grad(): # pred_semantic = t2s_model.model.infer pred_semantic,idx = t2s_model.model.infer_panel( all_phoneme_ids, all_phoneme_len, prompt, bert, # prompt_phone_len=ph_offset, top_k=config['inference']['top_k'], early_stop_num=hz * max_sec) t3 = ttime() cnt+=1 if idx == 0: return "Error: Generation failure: bad zero prediction.", None pred_semantic = pred_semantic[:,-idx:].unsqueeze(0) # .unsqueeze(0)#mq要多unsqueeze一次 refer = get_spepc(hps, ref_wav_path)#.to(device) if(is_half==True):refer=refer.half().to(device) else:refer=refer.to(device) # audio = vq_model.decode(pred_semantic, all_phoneme_ids, refer).detach().cpu().numpy()[0, 0] audio = vq_model.decode(pred_semantic, torch.LongTensor(phones2).to(device).unsqueeze(0), refer).detach().cpu().numpy()[0, 0]###试试重建不带上prompt部分 # direction乘上,变双通道 # 强制0.5 direction = np.array([1, 1]) audio = np.expand_dims(audio, 0) * direction[:, np.newaxis] audio_opt.append(audio) audio_opt.append(zero_wav) t4 = ttime() audio = (hps.data.sampling_rate,(np.concatenate(audio_opt, axis=1)*32768).astype(np.int16).T) prefix_1 = prompt_text[:8].replace(" ", "_").replace("\n", "_").replace("?","_").replace("!","_").replace(",","_") prefix_2 = target_text[:8].replace(" ", "_").replace("\n", "_").replace("?","_").replace("!","_").replace(",","_") filename = tempfile.mktemp(suffix=".wav",prefix=f"{prefix_1}_{prefix_2}_") #audiosegment.from_numpy_array(audio[1].T, framerate=audio[0]).export(filename, format="WAV") wavwrite(filename, audio[0], audio[1]) return "Success", audio, filename return tts_fn def get_str_list_from_phone(text, text_language): # raw文本过g2p得到音素列表,再转成字符串 # 注意,这里的text是一个段落,可能包含多个句子 # 段落间\n分割,音素间空格分割 print(text) texts=text.split("\n") phone_list = [] for text in texts: phones2, word2ph2, norm_text2 = clean_text(text, text_language) phone_list.append(" ".join(phones2)) return "\n".join(phone_list) def get_phone_from_str_list(str_list:str, language:str = 'ja'): # 从音素字符串中得到音素列表 # 注意,这里的text是一个段落,可能包含多个句子 # 段落间\n分割,音素间空格分割 sentences = str_list.split("\n") phones = [] for sentence in sentences: phones.append(sentence.split(" ")) return phones splits={",","。","?","!",",",".","?","!","~",":",":","—","…",}#不考虑省略号 def split(todo_text): todo_text = todo_text.replace("……", "。").replace("——", ",") if (todo_text[-1] not in splits): todo_text += "。" i_split_head = i_split_tail = 0 len_text = len(todo_text) todo_texts = [] while (1): if (i_split_head >= len_text): break # 结尾一定有标点,所以直接跳出即可,最后一段在上次已加入 if (todo_text[i_split_head] in splits): i_split_head += 1 todo_texts.append(todo_text[i_split_tail:i_split_head]) i_split_tail = i_split_head else: i_split_head += 1 return todo_texts def change_reference_audio(prompt_text, transcripts): return transcripts[prompt_text] models = [] models_info = json.load(open("./models/models_info.json", "r", encoding="utf-8")) for i, info in models_info.items(): title = info['title'] cover = info['cover'] gpt_weight = info['gpt_weight'] sovits_weight = info['sovits_weight'] example_reference = info['example_reference'] transcripts = {} transcript_path = info["transcript_path"] path = os.path.dirname(transcript_path) with open(transcript_path, 'r', encoding='utf-8') as file: for line in file: line = line.strip().replace("\\", "/") items = line.split("|") wav,t = items[0], items[-1] wav = os.path.basename(wav) transcripts[t] = os.path.join(os.path.join(path,"reference_audio"), wav) vq_model, ssl_model, t2s_model, hps, config, hz, max_sec = load_model(sovits_weight, gpt_weight) models.append( ( i, title, cover, transcripts, example_reference, create_tts_fn( vq_model, ssl_model, t2s_model, hps, config, hz, max_sec ) ) ) with gr.Blocks() as app: gr.Markdown( "#
GPT-SoVITS Demo\n" ) with gr.Tabs(): for (name, title, cover, transcripts, example_reference, tts_fn) in models: with gr.TabItem(name): with gr.Row(): gr.Markdown( '
' f'{title}' '
') with gr.Row(): with gr.Column(): prompt_text = gr.Dropdown( label="Transcript of the Reference Audio", value=example_reference if example_reference in transcripts else list(transcripts.keys())[0], choices=list(transcripts.keys()) ) inp_ref_audio = gr.Audio( label="Reference Audio", type="filepath", interactive=False, value=transcripts[example_reference] if example_reference in transcripts else list(transcripts.values())[0] ) transcripts_state = gr.State(value=transcripts) prompt_text.change( fn=change_reference_audio, inputs=[prompt_text, transcripts_state], outputs=[inp_ref_audio] ) prompt_language = gr.State(value="ja") with gr.Column(): text = gr.Textbox(label="Input Text", value="私はお兄ちゃんのだいだいだーいすきな妹なんだから、言うことなんでも聞いてくれますよね!") text_language = gr.Dropdown( label="Language", choices=["ja"], value="ja" ) clean_button = gr.Button("Clean Text", variant="primary") inference_button = gr.Button("Generate", variant="primary") cleaned_text = gr.Textbox(label="Cleaned Text") output = gr.Audio(label="Output Audio") output_file = gr.File(label="Output Audio File") om = gr.Textbox(label="Output Message") clean_button.click( fn=get_str_list_from_phone, inputs=[text, text_language], outputs=[cleaned_text] ) inference_button.click( fn=tts_fn, inputs=[inp_ref_audio, prompt_text, prompt_language, cleaned_text, text_language, text], outputs=[om, output, output_file] ) app.launch(share=True)