Spaces:
Running
Running
import os,shutil,sys,pdb,re | |
version="v2"#if sys.argv[-1]=="v2" else"v1" | |
os.environ["version"]=version | |
now_dir = os.getcwd() | |
sys.path.insert(0, now_dir) | |
import json,yaml,warnings,torch | |
import platform | |
import psutil | |
import signal | |
warnings.filterwarnings("ignore") | |
torch.manual_seed(233333) | |
tmp = os.path.join(now_dir, "TEMP") | |
os.makedirs(tmp, exist_ok=True) | |
os.environ["TEMP"] = tmp | |
if(os.path.exists(tmp)): | |
for name in os.listdir(tmp): | |
if(name=="jieba.cache"):continue | |
path="%s/%s"%(tmp,name) | |
delete=os.remove if os.path.isfile(path) else shutil.rmtree | |
try: | |
delete(path) | |
except Exception as e: | |
print(str(e)) | |
pass | |
import site | |
site_packages_roots = [] | |
for path in site.getsitepackages(): | |
if "packages" in path: | |
site_packages_roots.append(path) | |
if(site_packages_roots==[]):site_packages_roots=["%s/runtime/Lib/site-packages" % now_dir] | |
#os.environ["OPENBLAS_NUM_THREADS"] = "4" | |
os.environ["no_proxy"] = "localhost, 127.0.0.1, ::1" | |
os.environ["all_proxy"] = "" | |
for site_packages_root in site_packages_roots: | |
if os.path.exists(site_packages_root): | |
try: | |
with open("%s/users.pth" % (site_packages_root), "w") as f: | |
f.write( | |
"%s\n%s/tools\n%s/tools/damo_asr\n%s/GPT_SoVITS\n%s/tools/uvr5" | |
% (now_dir, now_dir, now_dir, now_dir, now_dir) | |
) | |
break | |
except PermissionError: | |
pass | |
from tools import my_utils | |
import traceback | |
import shutil | |
import pdb | |
import gradio as gr | |
from subprocess import Popen | |
import signal | |
from config import python_exec,infer_device,is_half,exp_root,webui_port_main,webui_port_infer_tts,webui_port_uvr5,webui_port_subfix,is_share | |
from tools.i18n.i18n import I18nAuto | |
i18n = I18nAuto() | |
from scipy.io import wavfile | |
from tools.my_utils import load_audio | |
from multiprocessing import cpu_count | |
# os.environ['PYTORCH_ENABLE_MPS_FALLBACK'] = '1' # 当遇到mps不支持的步骤时使用cpu | |
n_cpu=cpu_count() | |
ngpu = torch.cuda.device_count() | |
gpu_infos = [] | |
mem = [] | |
if_gpu_ok = False | |
# 判断是否有能用来训练和加速推理的N卡 | |
ok_gpu_keywords={"10","16","20","30","40","A2","A3","A4","P4","A50","500","A60","70","80","90","M4","T4","TITAN","L4","4060","H"} | |
set_gpu_numbers=set() | |
if torch.cuda.is_available() or ngpu != 0: | |
for i in range(ngpu): | |
gpu_name = torch.cuda.get_device_name(i) | |
if any(value in gpu_name.upper()for value in ok_gpu_keywords): | |
# A10#A100#V100#A40#P40#M40#K80#A4500 | |
if_gpu_ok = True # 至少有一张能用的N卡 | |
gpu_infos.append("%s\t%s" % (i, gpu_name)) | |
set_gpu_numbers.add(i) | |
mem.append(int(torch.cuda.get_device_properties(i).total_memory/ 1024/ 1024/ 1024+ 0.4)) | |
# # 判断是否支持mps加速 | |
# if torch.backends.mps.is_available(): | |
# if_gpu_ok = True | |
# gpu_infos.append("%s\t%s" % ("0", "Apple GPU")) | |
# mem.append(psutil.virtual_memory().total/ 1024 / 1024 / 1024) # 实测使用系统内存作为显存不会爆显存 | |
if if_gpu_ok and len(gpu_infos) > 0: | |
gpu_info = "\n".join(gpu_infos) | |
default_batch_size = min(mem) // 2 | |
else: | |
gpu_info = ("%s\t%s" % ("0", "CPU")) | |
gpu_infos.append("%s\t%s" % ("0", "CPU")) | |
set_gpu_numbers.add(0) | |
default_batch_size = int(psutil.virtual_memory().total/ 1024 / 1024 / 1024 / 2) | |
gpus = "-".join([i[0] for i in gpu_infos]) | |
default_gpu_numbers=str(sorted(list(set_gpu_numbers))[0]) | |
def fix_gpu_number(input):#将越界的number强制改到界内 | |
try: | |
if(int(input)not in set_gpu_numbers):return default_gpu_numbers | |
except:return input | |
return input | |
def fix_gpu_numbers(inputs): | |
output=[] | |
try: | |
for input in inputs.split(","):output.append(str(fix_gpu_number(input))) | |
return ",".join(output) | |
except: | |
return inputs | |
pretrained_sovits_name="GPT_SoVITS/pretrained_models/s2G2333k.pth" | |
pretrained_gpt_name="GPT_SoVITS/pretrained_models/s1bert25hz-5kh-longer-epoch=12-step=369668.ckpt" | |
def get_weights_names(): | |
SoVITS_names = [pretrained_sovits_name] | |
for name in os.listdir(SoVITS_weight_root): | |
if name.endswith(".pth"):SoVITS_names.append(name) | |
GPT_names = [pretrained_gpt_name] | |
for name in os.listdir(GPT_weight_root): | |
if name.endswith(".ckpt"): GPT_names.append(name) | |
return SoVITS_names,GPT_names | |
SoVITS_weight_root="SoVITS_weights" | |
GPT_weight_root="GPT_weights" | |
os.makedirs(SoVITS_weight_root,exist_ok=True) | |
os.makedirs(GPT_weight_root,exist_ok=True) | |
SoVITS_names,GPT_names = get_weights_names() | |
def custom_sort_key(s): | |
# 使用正则表达式提取字符串中的数字部分和非数字部分 | |
parts = re.split('(\d+)', s) | |
# 将数字部分转换为整数,非数字部分保持不变 | |
parts = [int(part) if part.isdigit() else part for part in parts] | |
return parts | |
def change_choices(): | |
SoVITS_names, GPT_names = get_weights_names() | |
return {"choices": sorted(SoVITS_names,key=custom_sort_key), "__type__": "update"}, {"choices": sorted(GPT_names,key=custom_sort_key), "__type__": "update"} | |
p_label=None | |
p_uvr5=None | |
p_asr=None | |
p_denoise=None | |
p_tts_inference=None | |
def kill_proc_tree(pid, including_parent=True): | |
try: | |
parent = psutil.Process(pid) | |
except psutil.NoSuchProcess: | |
# Process already terminated | |
return | |
children = parent.children(recursive=True) | |
for child in children: | |
try: | |
os.kill(child.pid, signal.SIGTERM) # or signal.SIGKILL | |
except OSError: | |
pass | |
if including_parent: | |
try: | |
os.kill(parent.pid, signal.SIGTERM) # or signal.SIGKILL | |
except OSError: | |
pass | |
system=platform.system() | |
def kill_process(pid): | |
if(system=="Windows"): | |
cmd = "taskkill /t /f /pid %s" % pid | |
os.system(cmd) | |
else: | |
kill_proc_tree(pid) | |
def change_label(if_label,path_list): | |
global p_label | |
if(if_label==True and p_label==None): | |
path_list=my_utils.clean_path(path_list) | |
cmd = '"%s" tools/subfix_webui.py --load_list "%s" --webui_port %s --is_share %s'%(python_exec,path_list,webui_port_subfix,is_share) | |
yield i18n("打标工具WebUI已开启") | |
print(cmd) | |
p_label = Popen(cmd, shell=True) | |
elif(if_label==False and p_label!=None): | |
kill_process(p_label.pid) | |
p_label=None | |
yield i18n("打标工具WebUI已关闭") | |
def change_uvr5(if_uvr5): | |
global p_uvr5 | |
if(if_uvr5==True and p_uvr5==None): | |
cmd = '"%s" tools/uvr5/webui.py "%s" %s %s %s'%(python_exec,infer_device,is_half,webui_port_uvr5,is_share) | |
yield i18n("UVR5已开启") | |
print(cmd) | |
p_uvr5 = Popen(cmd, shell=True) | |
elif(if_uvr5==False and p_uvr5!=None): | |
kill_process(p_uvr5.pid) | |
p_uvr5=None | |
yield i18n("UVR5已关闭") | |
def change_tts_inference(if_tts,bert_path,cnhubert_base_path,gpu_number,gpt_path,sovits_path): | |
global p_tts_inference | |
if(if_tts==True and p_tts_inference==None): | |
os.environ["gpt_path"]=gpt_path if "/" in gpt_path else "%s/%s"%(GPT_weight_root,gpt_path) | |
os.environ["sovits_path"]=sovits_path if "/"in sovits_path else "%s/%s"%(SoVITS_weight_root,sovits_path) | |
os.environ["cnhubert_base_path"]=cnhubert_base_path | |
os.environ["bert_path"]=bert_path | |
os.environ["_CUDA_VISIBLE_DEVICES"]=fix_gpu_number(gpu_number) | |
os.environ["is_half"]=str(is_half) | |
os.environ["infer_ttswebui"]=str(webui_port_infer_tts) | |
os.environ["is_share"]=str(is_share) | |
cmd = '"%s" GPT_SoVITS/inference_webui.py'%(python_exec) | |
yield i18n("TTS推理进程已开启") | |
print(cmd) | |
p_tts_inference = Popen(cmd, shell=True) | |
elif(if_tts==False and p_tts_inference!=None): | |
kill_process(p_tts_inference.pid) | |
p_tts_inference=None | |
yield i18n("TTS推理进程已关闭") | |
from tools.asr.config import asr_dict | |
def open_asr(asr_inp_dir, asr_opt_dir, asr_model, asr_model_size, asr_lang, asr_precision): | |
global p_asr | |
if(p_asr==None): | |
asr_inp_dir=my_utils.clean_path(asr_inp_dir) | |
asr_opt_dir=my_utils.clean_path(asr_opt_dir) | |
cmd = f'"{python_exec}" tools/asr/{asr_dict[asr_model]["path"]}' | |
cmd += f' -i "{asr_inp_dir}"' | |
cmd += f' -o "{asr_opt_dir}"' | |
cmd += f' -s {asr_model_size}' | |
cmd += f' -l {asr_lang}' | |
cmd += f" -p {asr_precision}" | |
output_file_name = os.path.basename(asr_inp_dir) | |
output_folder = asr_opt_dir or "output/asr_opt" | |
output_file_path = os.path.abspath(f'{output_folder}/{output_file_name}.list') | |
yield "ASR任务开启:%s"%cmd, {"__type__":"update","visible":False}, {"__type__":"update","visible":True}, {"__type__":"update"} | |
print(cmd) | |
p_asr = Popen(cmd, shell=True) | |
p_asr.wait() | |
p_asr=None | |
yield f"ASR任务完成, 查看终端进行下一步", {"__type__":"update","visible":True}, {"__type__":"update","visible":False}, {"__type__":"update","value":output_file_path} | |
else: | |
yield "已有正在进行的ASR任务,需先终止才能开启下一次任务", {"__type__":"update","visible":False}, {"__type__":"update","visible":True}, {"__type__":"update"} | |
# return None | |
def close_asr(): | |
global p_asr | |
if(p_asr!=None): | |
kill_process(p_asr.pid) | |
p_asr=None | |
return "已终止ASR进程", {"__type__":"update","visible":True}, {"__type__":"update","visible":False} | |
def open_denoise(denoise_inp_dir, denoise_opt_dir): | |
global p_denoise | |
if(p_denoise==None): | |
denoise_inp_dir=my_utils.clean_path(denoise_inp_dir) | |
denoise_opt_dir=my_utils.clean_path(denoise_opt_dir) | |
cmd = '"%s" tools/cmd-denoise.py -i "%s" -o "%s" -p %s'%(python_exec,denoise_inp_dir,denoise_opt_dir,"float16"if is_half==True else "float32") | |
yield "语音降噪任务开启:%s"%cmd, {"__type__":"update","visible":False}, {"__type__":"update","visible":True}, {"__type__":"update"} | |
print(cmd) | |
p_denoise = Popen(cmd, shell=True) | |
p_denoise.wait() | |
p_denoise=None | |
yield f"语音降噪任务完成, 查看终端进行下一步", {"__type__":"update","visible":True}, {"__type__":"update","visible":False}, {"__type__":"update","value":denoise_opt_dir} | |
else: | |
yield "已有正在进行的语音降噪任务,需先终止才能开启下一次任务", {"__type__":"update","visible":False}, {"__type__":"update","visible":True}, {"__type__":"update"} | |
# return None | |
def close_denoise(): | |
global p_denoise | |
if(p_denoise!=None): | |
kill_process(p_denoise.pid) | |
p_denoise=None | |
return "已终止语音降噪进程", {"__type__":"update","visible":True}, {"__type__":"update","visible":False} | |
p_train_SoVITS=None | |
def open1Ba(batch_size,total_epoch,exp_name,text_low_lr_rate,if_save_latest,if_save_every_weights,save_every_epoch,gpu_numbers1Ba,pretrained_s2G,pretrained_s2D): | |
global p_train_SoVITS | |
if(p_train_SoVITS==None): | |
with open("GPT_SoVITS/configs/s2.json")as f: | |
data=f.read() | |
data=json.loads(data) | |
s2_dir="%s/%s"%(exp_root,exp_name) | |
os.makedirs("%s/logs_s2"%(s2_dir),exist_ok=True) | |
if(is_half==False): | |
data["train"]["fp16_run"]=False | |
batch_size=max(1,batch_size//2) | |
data["train"]["batch_size"]=batch_size | |
data["train"]["epochs"]=total_epoch | |
data["train"]["text_low_lr_rate"]=text_low_lr_rate | |
data["train"]["pretrained_s2G"]=pretrained_s2G | |
data["train"]["pretrained_s2D"]=pretrained_s2D | |
data["train"]["if_save_latest"]=if_save_latest | |
data["train"]["if_save_every_weights"]=if_save_every_weights | |
data["train"]["save_every_epoch"]=save_every_epoch | |
data["train"]["gpu_numbers"]=gpu_numbers1Ba | |
data["data"]["exp_dir"]=data["s2_ckpt_dir"]=s2_dir | |
data["save_weight_dir"]=SoVITS_weight_root | |
data["name"]=exp_name | |
tmp_config_path="%s/tmp_s2.json"%tmp | |
with open(tmp_config_path,"w")as f:f.write(json.dumps(data)) | |
cmd = '"%s" GPT_SoVITS/s2_train.py --config "%s"'%(python_exec,tmp_config_path) | |
yield "SoVITS训练开始:%s"%cmd, {"__type__":"update","visible":False}, {"__type__":"update","visible":True} | |
print(cmd) | |
p_train_SoVITS = Popen(cmd, shell=True) | |
p_train_SoVITS.wait() | |
p_train_SoVITS=None | |
yield "SoVITS训练完成", {"__type__":"update","visible":True}, {"__type__":"update","visible":False} | |
else: | |
yield "已有正在进行的SoVITS训练任务,需先终止才能开启下一次任务", {"__type__":"update","visible":False}, {"__type__":"update","visible":True} | |
def close1Ba(): | |
global p_train_SoVITS | |
if(p_train_SoVITS!=None): | |
kill_process(p_train_SoVITS.pid) | |
p_train_SoVITS=None | |
return "已终止SoVITS训练", {"__type__":"update","visible":True}, {"__type__":"update","visible":False} | |
p_train_GPT=None | |
def open1Bb(batch_size,total_epoch,exp_name,if_dpo,if_save_latest,if_save_every_weights,save_every_epoch,gpu_numbers,pretrained_s1): | |
global p_train_GPT | |
if(p_train_GPT==None): | |
with open("GPT_SoVITS/configs/s1longer.yaml"if version=="v1"else "GPT_SoVITS/configs/s1longer-v2.yaml")as f: | |
data=f.read() | |
data=yaml.load(data, Loader=yaml.FullLoader) | |
s1_dir="%s/%s"%(exp_root,exp_name) | |
os.makedirs("%s/logs_s1"%(s1_dir),exist_ok=True) | |
if(is_half==False): | |
data["train"]["precision"]="32" | |
batch_size = max(1, batch_size // 2) | |
data["train"]["batch_size"]=batch_size | |
data["train"]["epochs"]=total_epoch | |
data["pretrained_s1"]=pretrained_s1 | |
data["train"]["save_every_n_epoch"]=save_every_epoch | |
data["train"]["if_save_every_weights"]=if_save_every_weights | |
data["train"]["if_save_latest"]=if_save_latest | |
data["train"]["if_dpo"]=if_dpo | |
data["train"]["half_weights_save_dir"]=GPT_weight_root | |
data["train"]["exp_name"]=exp_name | |
data["train_semantic_path"]="%s/6-name2semantic.tsv"%s1_dir | |
data["train_phoneme_path"]="%s/2-name2text.txt"%s1_dir | |
data["output_dir"]="%s/logs_s1"%s1_dir | |
os.environ["_CUDA_VISIBLE_DEVICES"]=fix_gpu_numbers(gpu_numbers.replace("-",",")) | |
os.environ["hz"]="25hz" | |
tmp_config_path="%s/tmp_s1.yaml"%tmp | |
with open(tmp_config_path, "w") as f:f.write(yaml.dump(data, default_flow_style=False)) | |
# cmd = '"%s" GPT_SoVITS/s1_train.py --config_file "%s" --train_semantic_path "%s/6-name2semantic.tsv" --train_phoneme_path "%s/2-name2text.txt" --output_dir "%s/logs_s1"'%(python_exec,tmp_config_path,s1_dir,s1_dir,s1_dir) | |
cmd = '"%s" GPT_SoVITS/s1_train.py --config_file "%s" '%(python_exec,tmp_config_path) | |
yield "GPT训练开始:%s"%cmd, {"__type__":"update","visible":False}, {"__type__":"update","visible":True} | |
print(cmd) | |
p_train_GPT = Popen(cmd, shell=True) | |
p_train_GPT.wait() | |
p_train_GPT=None | |
yield "GPT训练完成", {"__type__":"update","visible":True}, {"__type__":"update","visible":False} | |
else: | |
yield "已有正在进行的GPT训练任务,需先终止才能开启下一次任务", {"__type__":"update","visible":False}, {"__type__":"update","visible":True} | |
def close1Bb(): | |
global p_train_GPT | |
if(p_train_GPT!=None): | |
kill_process(p_train_GPT.pid) | |
p_train_GPT=None | |
return "已终止GPT训练", {"__type__":"update","visible":True}, {"__type__":"update","visible":False} | |
ps_slice=[] | |
def open_slice(inp,opt_root,threshold,min_length,min_interval,hop_size,max_sil_kept,_max,alpha,n_parts): | |
global ps_slice | |
inp = my_utils.clean_path(inp) | |
opt_root = my_utils.clean_path(opt_root) | |
if(os.path.exists(inp)==False): | |
yield "输入路径不存在", {"__type__":"update","visible":True}, {"__type__":"update","visible":False}, {"__type__": "update"}, {"__type__": "update"} | |
return | |
if os.path.isfile(inp):n_parts=1 | |
elif os.path.isdir(inp):pass | |
else: | |
yield "输入路径存在但既不是文件也不是文件夹", {"__type__":"update","visible":True}, {"__type__":"update","visible":False}, {"__type__": "update"}, {"__type__": "update"} | |
return | |
if (ps_slice == []): | |
for i_part in range(n_parts): | |
cmd = '"%s" tools/slice_audio.py "%s" "%s" %s %s %s %s %s %s %s %s %s''' % (python_exec,inp, opt_root, threshold, min_length, min_interval, hop_size, max_sil_kept, _max, alpha, i_part, n_parts) | |
print(cmd) | |
p = Popen(cmd, shell=True) | |
ps_slice.append(p) | |
yield "切割执行中", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}, {"__type__": "update"}, {"__type__": "update"} | |
for p in ps_slice: | |
p.wait() | |
ps_slice=[] | |
yield "切割结束", {"__type__":"update","visible":True}, {"__type__":"update","visible":False}, {"__type__": "update", "value":opt_root}, {"__type__": "update", "value":opt_root} | |
else: | |
yield "已有正在进行的切割任务,需先终止才能开启下一次任务", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}, {"__type__": "update"}, {"__type__": "update"} | |
def close_slice(): | |
global ps_slice | |
if (ps_slice != []): | |
for p_slice in ps_slice: | |
try: | |
kill_process(p_slice.pid) | |
except: | |
traceback.print_exc() | |
ps_slice=[] | |
return "已终止所有切割进程", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} | |
ps1a=[] | |
def open1a(inp_text,inp_wav_dir,exp_name,gpu_numbers,bert_pretrained_dir): | |
global ps1a | |
inp_text = my_utils.clean_path(inp_text) | |
inp_wav_dir = my_utils.clean_path(inp_wav_dir) | |
if (ps1a == []): | |
opt_dir="%s/%s"%(exp_root,exp_name) | |
config={ | |
"inp_text":inp_text, | |
"inp_wav_dir":inp_wav_dir, | |
"exp_name":exp_name, | |
"opt_dir":opt_dir, | |
"bert_pretrained_dir":bert_pretrained_dir, | |
} | |
gpu_names=gpu_numbers.split("-") | |
all_parts=len(gpu_names) | |
for i_part in range(all_parts): | |
config.update( | |
{ | |
"i_part": str(i_part), | |
"all_parts": str(all_parts), | |
"_CUDA_VISIBLE_DEVICES": fix_gpu_number(gpu_names[i_part]), | |
"is_half": str(is_half) | |
} | |
) | |
os.environ.update(config) | |
cmd = '"%s" GPT_SoVITS/prepare_datasets/1-get-text.py'%python_exec | |
print(cmd) | |
p = Popen(cmd, shell=True) | |
ps1a.append(p) | |
yield "文本进程执行中", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} | |
for p in ps1a: | |
p.wait() | |
opt = [] | |
for i_part in range(all_parts): | |
txt_path = "%s/2-name2text-%s.txt" % (opt_dir, i_part) | |
with open(txt_path, "r", encoding="utf8") as f: | |
opt += f.read().strip("\n").split("\n") | |
os.remove(txt_path) | |
path_text = "%s/2-name2text.txt" % opt_dir | |
with open(path_text, "w", encoding="utf8") as f: | |
f.write("\n".join(opt) + "\n") | |
ps1a=[] | |
if len("".join(opt)) > 0: | |
yield "文本进程成功", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} | |
else: | |
yield "文本进程失败", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} | |
else: | |
yield "已有正在进行的文本任务,需先终止才能开启下一次任务", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} | |
def close1a(): | |
global ps1a | |
if (ps1a != []): | |
for p1a in ps1a: | |
try: | |
kill_process(p1a.pid) | |
except: | |
traceback.print_exc() | |
ps1a=[] | |
return "已终止所有1a进程", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} | |
ps1b=[] | |
def open1b(inp_text,inp_wav_dir,exp_name,gpu_numbers,ssl_pretrained_dir): | |
global ps1b | |
inp_text = my_utils.clean_path(inp_text) | |
inp_wav_dir = my_utils.clean_path(inp_wav_dir) | |
if (ps1b == []): | |
config={ | |
"inp_text":inp_text, | |
"inp_wav_dir":inp_wav_dir, | |
"exp_name":exp_name, | |
"opt_dir":"%s/%s"%(exp_root,exp_name), | |
"cnhubert_base_dir":ssl_pretrained_dir, | |
"is_half": str(is_half) | |
} | |
gpu_names=gpu_numbers.split("-") | |
all_parts=len(gpu_names) | |
for i_part in range(all_parts): | |
config.update( | |
{ | |
"i_part": str(i_part), | |
"all_parts": str(all_parts), | |
"_CUDA_VISIBLE_DEVICES": fix_gpu_number(gpu_names[i_part]), | |
} | |
) | |
os.environ.update(config) | |
cmd = '"%s" GPT_SoVITS/prepare_datasets/2-get-hubert-wav32k.py'%python_exec | |
print(cmd) | |
p = Popen(cmd, shell=True) | |
ps1b.append(p) | |
yield "SSL提取进程执行中", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} | |
for p in ps1b: | |
p.wait() | |
ps1b=[] | |
yield "SSL提取进程结束", {"__type__":"update","visible":True}, {"__type__":"update","visible":False} | |
else: | |
yield "已有正在进行的SSL提取任务,需先终止才能开启下一次任务", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} | |
def close1b(): | |
global ps1b | |
if (ps1b != []): | |
for p1b in ps1b: | |
try: | |
kill_process(p1b.pid) | |
except: | |
traceback.print_exc() | |
ps1b=[] | |
return "已终止所有1b进程", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} | |
ps1c=[] | |
def open1c(inp_text,exp_name,gpu_numbers,pretrained_s2G_path): | |
global ps1c | |
inp_text = my_utils.clean_path(inp_text) | |
if (ps1c == []): | |
opt_dir="%s/%s"%(exp_root,exp_name) | |
config={ | |
"inp_text":inp_text, | |
"exp_name":exp_name, | |
"opt_dir":opt_dir, | |
"pretrained_s2G":pretrained_s2G_path, | |
"s2config_path":"GPT_SoVITS/configs/s2.json", | |
"is_half": str(is_half) | |
} | |
gpu_names=gpu_numbers.split("-") | |
all_parts=len(gpu_names) | |
for i_part in range(all_parts): | |
config.update( | |
{ | |
"i_part": str(i_part), | |
"all_parts": str(all_parts), | |
"_CUDA_VISIBLE_DEVICES": fix_gpu_number(gpu_names[i_part]), | |
} | |
) | |
os.environ.update(config) | |
cmd = '"%s" GPT_SoVITS/prepare_datasets/3-get-semantic.py'%python_exec | |
print(cmd) | |
p = Popen(cmd, shell=True) | |
ps1c.append(p) | |
yield "语义token提取进程执行中", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} | |
for p in ps1c: | |
p.wait() | |
opt = ["item_name\tsemantic_audio"] | |
path_semantic = "%s/6-name2semantic.tsv" % opt_dir | |
for i_part in range(all_parts): | |
semantic_path = "%s/6-name2semantic-%s.tsv" % (opt_dir, i_part) | |
with open(semantic_path, "r", encoding="utf8") as f: | |
opt += f.read().strip("\n").split("\n") | |
os.remove(semantic_path) | |
with open(path_semantic, "w", encoding="utf8") as f: | |
f.write("\n".join(opt) + "\n") | |
ps1c=[] | |
yield "语义token提取进程结束", {"__type__":"update","visible":True}, {"__type__":"update","visible":False} | |
else: | |
yield "已有正在进行的语义token提取任务,需先终止才能开启下一次任务", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} | |
def close1c(): | |
global ps1c | |
if (ps1c != []): | |
for p1c in ps1c: | |
try: | |
kill_process(p1c.pid) | |
except: | |
traceback.print_exc() | |
ps1c=[] | |
return "已终止所有语义token进程", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} | |
#####inp_text,inp_wav_dir,exp_name,gpu_numbers1a,gpu_numbers1Ba,gpu_numbers1c,bert_pretrained_dir,cnhubert_base_dir,pretrained_s2G | |
ps1abc=[] | |
def open1abc(inp_text,inp_wav_dir,exp_name,gpu_numbers1a,gpu_numbers1Ba,gpu_numbers1c,bert_pretrained_dir,ssl_pretrained_dir,pretrained_s2G_path): | |
global ps1abc | |
inp_text = my_utils.clean_path(inp_text) | |
inp_wav_dir = my_utils.clean_path(inp_wav_dir) | |
if (ps1abc == []): | |
opt_dir="%s/%s"%(exp_root,exp_name) | |
try: | |
#############################1a | |
path_text="%s/2-name2text.txt" % opt_dir | |
if(os.path.exists(path_text)==False or (os.path.exists(path_text)==True and len(open(path_text,"r",encoding="utf8").read().strip("\n").split("\n"))<2)): | |
config={ | |
"inp_text":inp_text, | |
"inp_wav_dir":inp_wav_dir, | |
"exp_name":exp_name, | |
"opt_dir":opt_dir, | |
"bert_pretrained_dir":bert_pretrained_dir, | |
"is_half": str(is_half) | |
} | |
gpu_names=gpu_numbers1a.split("-") | |
all_parts=len(gpu_names) | |
for i_part in range(all_parts): | |
config.update( | |
{ | |
"i_part": str(i_part), | |
"all_parts": str(all_parts), | |
"_CUDA_VISIBLE_DEVICES": fix_gpu_number(gpu_names[i_part]), | |
} | |
) | |
os.environ.update(config) | |
cmd = '"%s" GPT_SoVITS/prepare_datasets/1-get-text.py'%python_exec | |
print(cmd) | |
p = Popen(cmd, shell=True) | |
ps1abc.append(p) | |
yield "进度:1a-ing", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} | |
for p in ps1abc:p.wait() | |
opt = [] | |
for i_part in range(all_parts):#txt_path="%s/2-name2text-%s.txt"%(opt_dir,i_part) | |
txt_path = "%s/2-name2text-%s.txt" % (opt_dir, i_part) | |
with open(txt_path, "r",encoding="utf8") as f: | |
opt += f.read().strip("\n").split("\n") | |
os.remove(txt_path) | |
with open(path_text, "w",encoding="utf8") as f: | |
f.write("\n".join(opt) + "\n") | |
assert len("".join(opt)) > 0, "1Aa-文本获取进程失败" | |
yield "进度:1a-done", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} | |
ps1abc=[] | |
#############################1b | |
config={ | |
"inp_text":inp_text, | |
"inp_wav_dir":inp_wav_dir, | |
"exp_name":exp_name, | |
"opt_dir":opt_dir, | |
"cnhubert_base_dir":ssl_pretrained_dir, | |
} | |
gpu_names=gpu_numbers1Ba.split("-") | |
all_parts=len(gpu_names) | |
for i_part in range(all_parts): | |
config.update( | |
{ | |
"i_part": str(i_part), | |
"all_parts": str(all_parts), | |
"_CUDA_VISIBLE_DEVICES": fix_gpu_number(gpu_names[i_part]), | |
} | |
) | |
os.environ.update(config) | |
cmd = '"%s" GPT_SoVITS/prepare_datasets/2-get-hubert-wav32k.py'%python_exec | |
print(cmd) | |
p = Popen(cmd, shell=True) | |
ps1abc.append(p) | |
yield "进度:1a-done, 1b-ing", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} | |
for p in ps1abc:p.wait() | |
yield "进度:1a1b-done", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} | |
ps1abc=[] | |
#############################1c | |
path_semantic = "%s/6-name2semantic.tsv" % opt_dir | |
if(os.path.exists(path_semantic)==False or (os.path.exists(path_semantic)==True and os.path.getsize(path_semantic)<31)): | |
config={ | |
"inp_text":inp_text, | |
"exp_name":exp_name, | |
"opt_dir":opt_dir, | |
"pretrained_s2G":pretrained_s2G_path, | |
"s2config_path":"GPT_SoVITS/configs/s2.json", | |
} | |
gpu_names=gpu_numbers1c.split("-") | |
all_parts=len(gpu_names) | |
for i_part in range(all_parts): | |
config.update( | |
{ | |
"i_part": str(i_part), | |
"all_parts": str(all_parts), | |
"_CUDA_VISIBLE_DEVICES": fix_gpu_number(gpu_names[i_part]), | |
} | |
) | |
os.environ.update(config) | |
cmd = '"%s" GPT_SoVITS/prepare_datasets/3-get-semantic.py'%python_exec | |
print(cmd) | |
p = Popen(cmd, shell=True) | |
ps1abc.append(p) | |
yield "进度:1a1b-done, 1cing", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} | |
for p in ps1abc:p.wait() | |
opt = ["item_name\tsemantic_audio"] | |
for i_part in range(all_parts): | |
semantic_path = "%s/6-name2semantic-%s.tsv" % (opt_dir, i_part) | |
with open(semantic_path, "r",encoding="utf8") as f: | |
opt += f.read().strip("\n").split("\n") | |
os.remove(semantic_path) | |
with open(path_semantic, "w",encoding="utf8") as f: | |
f.write("\n".join(opt) + "\n") | |
yield "进度:all-done", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} | |
ps1abc = [] | |
yield "一键三连进程结束", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} | |
except: | |
traceback.print_exc() | |
close1abc() | |
yield "一键三连中途报错", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} | |
else: | |
yield "已有正在进行的一键三连任务,需先终止才能开启下一次任务", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} | |
def close1abc(): | |
global ps1abc | |
if (ps1abc != []): | |
for p1abc in ps1abc: | |
try: | |
kill_process(p1abc.pid) | |
except: | |
traceback.print_exc() | |
ps1abc=[] | |
return "已终止所有一键三连进程", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False} | |
with gr.Blocks(title="GPT-SoVITS WebUI") as app: | |
gr.Markdown( | |
value= | |
i18n("本软件以MIT协议开源, 作者不对软件具备任何控制力, 使用软件者、传播软件导出的声音者自负全责. <br>如不认可该条款, 则不能使用或引用软件包内任何代码和文件. 详见根目录<b>LICENSE</b>.") | |
) | |
gr.Markdown( | |
value= | |
i18n("中文教程文档:https://www.yuque.com/baicaigongchang1145haoyuangong/ib3g1e") | |
) | |
with gr.Tabs(): | |
with gr.TabItem(i18n("0-前置数据集获取工具")):#提前随机切片防止uvr5爆内存->uvr5->slicer->asr->打标 | |
gr.Markdown(value=i18n("0a-UVR5人声伴奏分离&去混响去延迟工具")) | |
with gr.Row(): | |
if_uvr5 = gr.Checkbox(label=i18n("是否开启UVR5-WebUI"),show_label=True) | |
uvr5_info = gr.Textbox(label=i18n("UVR5进程输出信息")) | |
gr.Markdown(value=i18n("0b-语音切分工具")) | |
with gr.Row(): | |
with gr.Column(scale=3): | |
with gr.Row(): | |
slice_inp_path=gr.Textbox(label=i18n("音频自动切分输入路径,可文件可文件夹"),value="") | |
slice_opt_root=gr.Textbox(label=i18n("切分后的子音频的输出根目录"),value="output/slicer_opt") | |
with gr.Row(): | |
threshold=gr.Textbox(label=i18n("threshold:音量小于这个值视作静音的备选切割点"),value="-34") | |
min_length=gr.Textbox(label=i18n("min_length:每段最小多长,如果第一段太短一直和后面段连起来直到超过这个值"),value="4000") | |
min_interval=gr.Textbox(label=i18n("min_interval:最短切割间隔"),value="300") | |
hop_size=gr.Textbox(label=i18n("hop_size:怎么算音量曲线,越小精度越大计算量越高(不是精度越大效果越好)"),value="10") | |
max_sil_kept=gr.Textbox(label=i18n("max_sil_kept:切完后静音最多留多长"),value="500") | |
with gr.Row(): | |
_max=gr.Slider(minimum=0,maximum=1,step=0.05,label=i18n("max:归一化后最大值多少"),value=0.9,interactive=True) | |
alpha=gr.Slider(minimum=0,maximum=1,step=0.05,label=i18n("alpha_mix:混多少比例归一化后音频进来"),value=0.25,interactive=True) | |
n_process=gr.Slider(minimum=1,maximum=n_cpu,step=1,label=i18n("切割使用的进程数"),value=4,interactive=True) | |
with gr.Row(): | |
slicer_info = gr.Textbox(label=i18n("语音切割进程输出信息")) | |
open_slicer_button=gr.Button(i18n("开启语音切割"), variant="primary",visible=True) | |
close_slicer_button=gr.Button(i18n("终止语音切割"), variant="primary",visible=False) | |
gr.Markdown(value=i18n("0bb-语音降噪工具")) | |
with gr.Row(): | |
with gr.Column(scale=3): | |
with gr.Row(): | |
denoise_input_dir=gr.Textbox(label=i18n("降噪音频文件输入文件夹"),value="") | |
denoise_output_dir=gr.Textbox(label=i18n("降噪结果输出文件夹"),value="output/denoise_opt") | |
with gr.Row(): | |
denoise_info = gr.Textbox(label=i18n("语音降噪进程输出信息")) | |
open_denoise_button = gr.Button(i18n("开启语音降噪"), variant="primary",visible=True) | |
close_denoise_button = gr.Button(i18n("终止语音降噪进程"), variant="primary",visible=False) | |
gr.Markdown(value=i18n("0c-中文批量离线ASR工具")) | |
with gr.Row(): | |
with gr.Column(scale=3): | |
with gr.Row(): | |
asr_inp_dir = gr.Textbox( | |
label=i18n("输入文件夹路径"), | |
value="D:\\GPT-SoVITS\\raw\\xxx", | |
interactive=True, | |
) | |
asr_opt_dir = gr.Textbox( | |
label = i18n("输出文件夹路径"), | |
value = "output/asr_opt", | |
interactive = True, | |
) | |
with gr.Row(): | |
asr_model = gr.Dropdown( | |
label = i18n("ASR 模型"), | |
choices = list(asr_dict.keys()), | |
interactive = True, | |
value="达摩 ASR (中文)" | |
) | |
asr_size = gr.Dropdown( | |
label = i18n("ASR 模型尺寸"), | |
choices = ["large"], | |
interactive = True, | |
value="large" | |
) | |
asr_lang = gr.Dropdown( | |
label = i18n("ASR 语言设置"), | |
choices = ["zh"], | |
interactive = True, | |
value="zh" | |
) | |
asr_precision = gr.Dropdown( | |
label = i18n("数据类型精度"), | |
choices = ["float32"], | |
interactive = True, | |
value="float32" | |
) | |
with gr.Row(): | |
asr_info = gr.Textbox(label=i18n("ASR进程输出信息")) | |
open_asr_button = gr.Button(i18n("开启离线批量ASR"), variant="primary",visible=True) | |
close_asr_button = gr.Button(i18n("终止ASR进程"), variant="primary",visible=False) | |
def change_lang_choices(key): #根据选择的模型修改可选的语言 | |
# return gr.Dropdown(choices=asr_dict[key]['lang']) | |
return {"__type__": "update", "choices": asr_dict[key]['lang'],"value":asr_dict[key]['lang'][0]} | |
def change_size_choices(key): # 根据选择的模型修改可选的模型尺寸 | |
# return gr.Dropdown(choices=asr_dict[key]['size']) | |
return {"__type__": "update", "choices": asr_dict[key]['size'],"value":asr_dict[key]['size'][-1]} | |
def change_precision_choices(key): #根据选择的模型修改可选的语言 | |
if key =="Faster Whisper (多语种)": | |
if default_batch_size <= 4: | |
precision = 'int8' | |
elif is_half: | |
precision = 'float16' | |
else: | |
precision = 'float32' | |
else: | |
precision = 'float32' | |
# return gr.Dropdown(choices=asr_dict[key]['precision']) | |
return {"__type__": "update", "choices": asr_dict[key]['precision'],"value":precision} | |
asr_model.change(change_lang_choices, [asr_model], [asr_lang]) | |
asr_model.change(change_size_choices, [asr_model], [asr_size]) | |
asr_model.change(change_precision_choices, [asr_model], [asr_precision]) | |
gr.Markdown(value=i18n("0d-语音文本校对标注工具")) | |
with gr.Row(): | |
if_label = gr.Checkbox(label=i18n("是否开启打标WebUI"),show_label=True) | |
path_list = gr.Textbox( | |
label=i18n(".list标注文件的路径"), | |
value="D:\\RVC1006\\GPT-SoVITS\\raw\\xxx.list", | |
interactive=True, | |
) | |
label_info = gr.Textbox(label=i18n("打标工具进程输出信息")) | |
if_label.change(change_label, [if_label,path_list], [label_info]) | |
if_uvr5.change(change_uvr5, [if_uvr5], [uvr5_info]) | |
open_asr_button.click(open_asr, [asr_inp_dir, asr_opt_dir, asr_model, asr_size, asr_lang, asr_precision], [asr_info,open_asr_button,close_asr_button,path_list]) | |
close_asr_button.click(close_asr, [], [asr_info,open_asr_button,close_asr_button]) | |
open_slicer_button.click(open_slice, [slice_inp_path,slice_opt_root,threshold,min_length,min_interval,hop_size,max_sil_kept,_max,alpha,n_process], [slicer_info,open_slicer_button,close_slicer_button,asr_inp_dir,denoise_input_dir]) | |
close_slicer_button.click(close_slice, [], [slicer_info,open_slicer_button,close_slicer_button]) | |
open_denoise_button.click(open_denoise, [denoise_input_dir,denoise_output_dir], [denoise_info,open_denoise_button,close_denoise_button,asr_inp_dir]) | |
close_denoise_button.click(close_denoise, [], [denoise_info,open_denoise_button,close_denoise_button]) | |
with gr.TabItem(i18n("1-GPT-SoVITS-TTS")): | |
with gr.Row(): | |
exp_name = gr.Textbox(label=i18n("*实验/模型名"), value="xxx", interactive=True) | |
gpu_info = gr.Textbox(label=i18n("显卡信息"), value=gpu_info, visible=True, interactive=False) | |
pretrained_s2G = gr.Textbox(label=i18n("预训练的SoVITS-G模型路径"), value=pretrained_sovits_name, interactive=True) | |
pretrained_s2D = gr.Textbox(label=i18n("预训练的SoVITS-D模型路径"), value=pretrained_sovits_name.replace("s2G","s2D"), interactive=True) | |
pretrained_s1 = gr.Textbox(label=i18n("预训练的GPT模型路径"), value=pretrained_gpt_name, interactive=True) | |
with gr.TabItem(i18n("1A-训练集格式化工具")): | |
gr.Markdown(value=i18n("输出logs/实验名目录下应有23456开头的文件和文件夹")) | |
with gr.Row(): | |
inp_text = gr.Textbox(label=i18n("*文本标注文件"),value=r"D:\RVC1006\GPT-SoVITS\raw\xxx.list",interactive=True) | |
inp_wav_dir = gr.Textbox( | |
label=i18n("*训练集音频文件目录"), | |
# value=r"D:\RVC1006\GPT-SoVITS\raw\xxx", | |
interactive=True, | |
placeholder=i18n("填切割后音频所在目录!读取的音频文件完整路径=该目录-拼接-list文件里波形对应的文件名(不是全路径)。如果留空则使用.list文件里的绝对全路径。") | |
) | |
gr.Markdown(value=i18n("1Aa-文本内容")) | |
with gr.Row(): | |
gpu_numbers1a = gr.Textbox(label=i18n("GPU卡号以-分割,每个卡号一个进程"),value="%s-%s"%(gpus,gpus),interactive=True) | |
bert_pretrained_dir = gr.Textbox(label=i18n("预训练的中文BERT模型路径"),value="GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large",interactive=False) | |
button1a_open = gr.Button(i18n("开启文本获取"), variant="primary",visible=True) | |
button1a_close = gr.Button(i18n("终止文本获取进程"), variant="primary",visible=False) | |
info1a=gr.Textbox(label=i18n("文本进程输出信息")) | |
gr.Markdown(value=i18n("1Ab-SSL自监督特征提取")) | |
with gr.Row(): | |
gpu_numbers1Ba = gr.Textbox(label=i18n("GPU卡号以-分割,每个卡号一个进程"),value="%s-%s"%(gpus,gpus),interactive=True) | |
cnhubert_base_dir = gr.Textbox(label=i18n("预训练的SSL模型路径"),value="GPT_SoVITS/pretrained_models/chinese-hubert-base",interactive=False) | |
button1b_open = gr.Button(i18n("开启SSL提取"), variant="primary",visible=True) | |
button1b_close = gr.Button(i18n("终止SSL提取进程"), variant="primary",visible=False) | |
info1b=gr.Textbox(label=i18n("SSL进程输出信息")) | |
gr.Markdown(value=i18n("1Ac-语义token提取")) | |
with gr.Row(): | |
gpu_numbers1c = gr.Textbox(label=i18n("GPU卡号以-分割,每个卡号一个进程"),value="%s-%s"%(gpus,gpus),interactive=True) | |
button1c_open = gr.Button(i18n("开启语义token提取"), variant="primary",visible=True) | |
button1c_close = gr.Button(i18n("终止语义token提取进程"), variant="primary",visible=False) | |
info1c=gr.Textbox(label=i18n("语义token提取进程输出信息")) | |
gr.Markdown(value=i18n("1Aabc-训练集格式化一键三连")) | |
with gr.Row(): | |
button1abc_open = gr.Button(i18n("开启一键三连"), variant="primary",visible=True) | |
button1abc_close = gr.Button(i18n("终止一键三连"), variant="primary",visible=False) | |
info1abc=gr.Textbox(label=i18n("一键三连进程输出信息")) | |
button1a_open.click(open1a, [inp_text,inp_wav_dir,exp_name,gpu_numbers1a,bert_pretrained_dir], [info1a,button1a_open,button1a_close]) | |
button1a_close.click(close1a, [], [info1a,button1a_open,button1a_close]) | |
button1b_open.click(open1b, [inp_text,inp_wav_dir,exp_name,gpu_numbers1Ba,cnhubert_base_dir], [info1b,button1b_open,button1b_close]) | |
button1b_close.click(close1b, [], [info1b,button1b_open,button1b_close]) | |
button1c_open.click(open1c, [inp_text,exp_name,gpu_numbers1c,pretrained_s2G], [info1c,button1c_open,button1c_close]) | |
button1c_close.click(close1c, [], [info1c,button1c_open,button1c_close]) | |
button1abc_open.click(open1abc, [inp_text,inp_wav_dir,exp_name,gpu_numbers1a,gpu_numbers1Ba,gpu_numbers1c,bert_pretrained_dir,cnhubert_base_dir,pretrained_s2G], [info1abc,button1abc_open,button1abc_close]) | |
button1abc_close.click(close1abc, [], [info1abc,button1abc_open,button1abc_close]) | |
with gr.TabItem(i18n("1B-微调训练")): | |
gr.Markdown(value=i18n("1Ba-SoVITS训练。用于分享的模型文件输出在SoVITS_weights下。")) | |
with gr.Row(): | |
batch_size = gr.Slider(minimum=1,maximum=40,step=1,label=i18n("每张显卡的batch_size"),value=default_batch_size,interactive=True) | |
total_epoch = gr.Slider(minimum=1,maximum=25,step=1,label=i18n("总训练轮数total_epoch,不建议太高"),value=8,interactive=True) | |
text_low_lr_rate = gr.Slider(minimum=0.2,maximum=0.6,step=0.05,label=i18n("文本模块学习率权重"),value=0.4,interactive=True) | |
save_every_epoch = gr.Slider(minimum=1,maximum=25,step=1,label=i18n("保存频率save_every_epoch"),value=4,interactive=True) | |
if_save_latest = gr.Checkbox(label=i18n("是否仅保存最新的ckpt文件以节省硬盘空间"), value=True, interactive=True, show_label=True) | |
if_save_every_weights = gr.Checkbox(label=i18n("是否在每次保存时间点将最终小模型保存至weights文件夹"), value=True, interactive=True, show_label=True) | |
gpu_numbers1Ba = gr.Textbox(label=i18n("GPU卡号以-分割,每个卡号一个进程"), value="%s" % (gpus), interactive=True) | |
with gr.Row(): | |
button1Ba_open = gr.Button(i18n("开启SoVITS训练"), variant="primary",visible=True) | |
button1Ba_close = gr.Button(i18n("终止SoVITS训练"), variant="primary",visible=False) | |
info1Ba=gr.Textbox(label=i18n("SoVITS训练进程输出信息")) | |
gr.Markdown(value=i18n("1Bb-GPT训练。用于分享的模型文件输出在GPT_weights下。")) | |
with gr.Row(): | |
batch_size1Bb = gr.Slider(minimum=1,maximum=40,step=1,label=i18n("每张显卡的batch_size"),value=default_batch_size,interactive=True) | |
total_epoch1Bb = gr.Slider(minimum=2,maximum=50,step=1,label=i18n("总训练轮数total_epoch"),value=15,interactive=True) | |
if_dpo = gr.Checkbox(label=i18n("是否开启dpo训练选项(实验性)"), value=False, interactive=True, show_label=True) | |
if_save_latest1Bb = gr.Checkbox(label=i18n("是否仅保存最新的ckpt文件以节省硬盘空间"), value=True, interactive=True, show_label=True) | |
if_save_every_weights1Bb = gr.Checkbox(label=i18n("是否在每次保存时间点将最终小模型保存至weights文件夹"), value=True, interactive=True, show_label=True) | |
save_every_epoch1Bb = gr.Slider(minimum=1,maximum=50,step=1,label=i18n("保存频率save_every_epoch"),value=5,interactive=True) | |
gpu_numbers1Bb = gr.Textbox(label=i18n("GPU卡号以-分割,每个卡号一个进程"), value="%s" % (gpus), interactive=True) | |
with gr.Row(): | |
button1Bb_open = gr.Button(i18n("开启GPT训练"), variant="primary",visible=True) | |
button1Bb_close = gr.Button(i18n("终止GPT训练"), variant="primary",visible=False) | |
info1Bb=gr.Textbox(label=i18n("GPT训练进程输出信息")) | |
button1Ba_open.click(open1Ba, [batch_size,total_epoch,exp_name,text_low_lr_rate,if_save_latest,if_save_every_weights,save_every_epoch,gpu_numbers1Ba,pretrained_s2G,pretrained_s2D], [info1Ba,button1Ba_open,button1Ba_close]) | |
button1Ba_close.click(close1Ba, [], [info1Ba,button1Ba_open,button1Ba_close]) | |
button1Bb_open.click(open1Bb, [batch_size1Bb,total_epoch1Bb,exp_name,if_dpo,if_save_latest1Bb,if_save_every_weights1Bb,save_every_epoch1Bb,gpu_numbers1Bb,pretrained_s1], [info1Bb,button1Bb_open,button1Bb_close]) | |
button1Bb_close.click(close1Bb, [], [info1Bb,button1Bb_open,button1Bb_close]) | |
with gr.TabItem(i18n("1C-推理")): | |
gr.Markdown(value=i18n("选择训练完存放在SoVITS_weights和GPT_weights下的模型。默认的一个是底模,体验5秒Zero Shot TTS用。")) | |
with gr.Row(): | |
GPT_dropdown = gr.Dropdown(label=i18n("*GPT模型列表"), choices=sorted(GPT_names,key=custom_sort_key),value=pretrained_gpt_name,interactive=True) | |
SoVITS_dropdown = gr.Dropdown(label=i18n("*SoVITS模型列表"), choices=sorted(SoVITS_names,key=custom_sort_key),value=pretrained_sovits_name,interactive=True) | |
gpu_number_1C=gr.Textbox(label=i18n("GPU卡号,只能填1个整数"), value=gpus, interactive=True) | |
refresh_button = gr.Button(i18n("刷新模型路径"), variant="primary") | |
refresh_button.click(fn=change_choices,inputs=[],outputs=[SoVITS_dropdown,GPT_dropdown]) | |
with gr.Row(): | |
if_tts = gr.Checkbox(label=i18n("是否开启TTS推理WebUI"), show_label=True) | |
tts_info = gr.Textbox(label=i18n("TTS推理WebUI进程输出信息")) | |
if_tts.change(change_tts_inference, [if_tts,bert_pretrained_dir,cnhubert_base_dir,gpu_number_1C,GPT_dropdown,SoVITS_dropdown], [tts_info]) | |
with gr.TabItem(i18n("2-GPT-SoVITS-变声")):gr.Markdown(value=i18n("施工中,请静候佳音")) | |
# app.queue(concurrency_count=511, max_size=1022).launch( | |
# server_name="0.0.0.0", | |
# inbrowser=True, | |
# share=is_share, | |
# server_port=webui_port_main, | |
# quiet=True, | |
# ) | |
# button1abc_open.click(open1abc, [inp_text,inp_wav_dir,exp_name,gpu_numbers1a,gpu_numbers1Ba,gpu_numbers1c,bert_pretrained_dir,cnhubert_base_dir,pretrained_s2G], [info1abc,button1abc_open,button1abc_close]) | |
# open1abc(./*/transcript.txt, None, *, "0-1", "0-1", "0-1", "GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large", "GPT_SoVITS/pretrained_models/chinese-hubert-base", "GPT_SoVITS/pretrained_models/s2G2333k.pth") | |
import os | |
import sys | |
# to avoid the modified user.pth file | |
cnhubert_base_path = "GPT_SoVITS\pretrained_models\chinese-hubert-base" | |
bert_path = "GPT_SoVITS\pretrained_models\chinese-roberta-wwm-ext-large" | |
os.environ["version"] = 'v2' | |
now_dir = os.getcwd() | |
sys.path.insert(0, now_dir) | |
import gradio as gr | |
from transformers import AutoModelForMaskedLM, AutoTokenizer | |
import numpy as np | |
from pathlib import Path | |
import os,librosa,torch | |
from scipy.io.wavfile import write as wavwrite | |
from GPT_SoVITS.feature_extractor import cnhubert | |
cnhubert.cnhubert_base_path=cnhubert_base_path | |
from GPT_SoVITS.module.models import SynthesizerTrn | |
from GPT_SoVITS.AR.models.t2s_lightning_module import Text2SemanticLightningModule | |
from GPT_SoVITS.text import cleaned_text_to_sequence | |
from GPT_SoVITS.text.cleaner import clean_text | |
import GPT_SoVITS.utils | |
from time import time as ttime | |
from GPT_SoVITS.module.mel_processing import spectrogram_torch | |
import tempfile | |
from tools.my_utils import load_audio | |
import os | |
import json | |
################ End strange import and user.pth modification ################ | |
# import pyopenjtalk | |
# cwd = os.getcwd() | |
# if os.path.exists(os.path.join(cwd,'user.dic')): | |
# pyopenjtalk.update_global_jtalk_with_user_dict(os.path.join(cwd, 'user.dic')) | |
import logging | |
logging.getLogger('httpx').setLevel(logging.WARNING) | |
logging.getLogger('httpcore').setLevel(logging.WARNING) | |
logging.getLogger('multipart').setLevel(logging.WARNING) | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
#device = "cpu" | |
is_half = False | |
tokenizer = AutoTokenizer.from_pretrained(bert_path) | |
bert_model=AutoModelForMaskedLM.from_pretrained(bert_path) | |
if(is_half==True):bert_model=bert_model.half().to(device) | |
else:bert_model=bert_model.to(device) | |
# bert_model=bert_model.to(device) | |
def get_bert_feature(text, word2ph): # Bert(不是HuBERT的特征计算) | |
with torch.no_grad(): | |
inputs = tokenizer(text, return_tensors="pt") | |
for i in inputs: | |
inputs[i] = inputs[i].to(device)#####输入是long不用管精度问题,精度随bert_model | |
res = bert_model(**inputs, output_hidden_states=True) | |
res = torch.cat(res["hidden_states"][-3:-2], -1)[0].cpu()[1:-1] | |
assert len(word2ph) == len(text) | |
phone_level_feature = [] | |
for i in range(len(word2ph)): | |
repeat_feature = res[i].repeat(word2ph[i], 1) | |
phone_level_feature.append(repeat_feature) | |
phone_level_feature = torch.cat(phone_level_feature, dim=0) | |
# if(is_half==True):phone_level_feature=phone_level_feature.half() | |
return phone_level_feature.T | |
loaded_sovits_model = [] # [(path, dict, model)] | |
loaded_gpt_model = [] | |
ssl_model = cnhubert.get_model() | |
if (is_half == True): | |
ssl_model = ssl_model.half().to(device) | |
else: | |
ssl_model = ssl_model.to(device) | |
def load_model(sovits_path, gpt_path): | |
global ssl_model | |
global loaded_sovits_model | |
global loaded_gpt_model | |
vq_model = None | |
t2s_model = None | |
dict_s2 = None | |
dict_s1 = None | |
hps = None | |
for path, dict_s2_, model in loaded_sovits_model: | |
if path == sovits_path: | |
vq_model = model | |
dict_s2 = dict_s2_ | |
break | |
for path, dict_s1_, model in loaded_gpt_model: | |
if path == gpt_path: | |
t2s_model = model | |
dict_s1 = dict_s1_ | |
break | |
if dict_s2 is None: | |
dict_s2 = torch.load(sovits_path, map_location="cpu") | |
hps = dict_s2["config"] | |
if dict_s1 is None: | |
dict_s1 = torch.load(gpt_path, map_location="cpu") | |
config = dict_s1["config"] | |
class DictToAttrRecursive: | |
def __init__(self, input_dict): | |
for key, value in input_dict.items(): | |
if isinstance(value, dict): | |
# 如果值是字典,递归调用构造函数 | |
setattr(self, key, DictToAttrRecursive(value)) | |
else: | |
setattr(self, key, value) | |
hps = DictToAttrRecursive(hps) | |
hps.model.semantic_frame_rate = "25hz" | |
if not vq_model: | |
vq_model = SynthesizerTrn( | |
hps.data.filter_length // 2 + 1, | |
hps.train.segment_size // hps.data.hop_length, | |
n_speakers=hps.data.n_speakers, | |
**hps.model) | |
if (is_half == True): | |
vq_model = vq_model.half().to(device) | |
else: | |
vq_model = vq_model.to(device) | |
vq_model.eval() | |
vq_model.load_state_dict(dict_s2["weight"], strict=False) | |
loaded_sovits_model.append((sovits_path, dict_s2, vq_model)) | |
hz = 50 | |
max_sec = config['data']['max_sec'] | |
if not t2s_model: | |
t2s_model = Text2SemanticLightningModule(config, "ojbk", is_train=False) | |
t2s_model.load_state_dict(dict_s1["weight"]) | |
if (is_half == True): t2s_model = t2s_model.half() | |
t2s_model = t2s_model.to(device) | |
t2s_model.eval() | |
total = sum([param.nelement() for param in t2s_model.parameters()]) | |
loaded_gpt_model.append((gpt_path, dict_s1, t2s_model)) | |
return vq_model, ssl_model, t2s_model, hps, config, hz, max_sec | |
def get_spepc(hps, filename): | |
audio=load_audio(filename,int(hps.data.sampling_rate)) | |
audio = audio / np.max(np.abs(audio)) | |
audio=torch.FloatTensor(audio) | |
audio_norm = audio | |
# audio_norm = audio / torch.max(torch.abs(audio)) | |
audio_norm = audio_norm.unsqueeze(0) | |
spec = spectrogram_torch(audio_norm, hps.data.filter_length,hps.data.sampling_rate, hps.data.hop_length, hps.data.win_length,center=False) | |
return spec | |
def create_tts_fn(vq_model, ssl_model, t2s_model, hps, config, hz, max_sec): | |
def tts_fn(ref_wav_path, prompt_text, prompt_language, target_phone, text_language, target_text = None): | |
t0 = ttime() | |
prompt_text=prompt_text.strip() | |
prompt_language=prompt_language | |
with torch.no_grad(): | |
wav16k, sr = librosa.load(ref_wav_path, sr=16000, mono=False) | |
direction = np.array([1,1]) | |
if wav16k.ndim == 2: | |
power = np.sum(np.abs(wav16k) ** 2, axis=1) | |
direction = power / np.sum(power) | |
wav16k = (wav16k[0] + wav16k[1]) / 2 | |
# | |
# maxx=0.95 | |
# tmp_max = np.abs(wav16k).max() | |
# alpha=0.5 | |
# wav16k = (wav16k / tmp_max * (maxx * alpha*32768)) + ((1 - alpha)*32768) * wav16k | |
#在这里归一化 | |
#print(max(np.abs(wav16k))) | |
#wav16k = wav16k / np.max(np.abs(wav16k)) | |
#print(max(np.abs(wav16k))) | |
# 添加0.3s的静音 | |
wav16k = np.concatenate([wav16k, np.zeros(int(hps.data.sampling_rate * 0.3)),]) | |
wav16k = torch.from_numpy(wav16k) | |
wav16k = wav16k.float() | |
if(is_half==True):wav16k=wav16k.half().to(device) | |
else:wav16k=wav16k.to(device) | |
ssl_content = ssl_model.model(wav16k.unsqueeze(0))["last_hidden_state"].transpose(1, 2)#.float() | |
codes = vq_model.extract_latent(ssl_content) | |
prompt_semantic = codes[0, 0] | |
t1 = ttime() | |
phones1, word2ph1, norm_text1 = clean_text(prompt_text, prompt_language) | |
phones1=cleaned_text_to_sequence(phones1) | |
#texts=text.split("\n") | |
audio_opt = [] | |
zero_wav=np.zeros((2, int(hps.data.sampling_rate*0.3)),dtype=np.float16 if is_half==True else np.float32) | |
phones = get_phone_from_str_list(target_phone, text_language) | |
for phones2 in phones: | |
if(len(phones2) == 0): | |
continue | |
if(len(phones2) == 1 and phones2[0] == ""): | |
continue | |
#phones2, word2ph2, norm_text2 = clean_text(text, text_language) | |
phones2 = cleaned_text_to_sequence(phones2) | |
#if(prompt_language=="zh"):bert1 = get_bert_feature(norm_text1, word2ph1).to(device) | |
bert1 = torch.zeros((1024, len(phones1)),dtype=torch.float16 if is_half==True else torch.float32).to(device) | |
#if(text_language=="zh"):bert2 = get_bert_feature(norm_text2, word2ph2).to(device) | |
bert2 = torch.zeros((1024, len(phones2))).to(bert1) | |
bert = torch.cat([bert1, bert2], 1) | |
all_phoneme_ids = torch.LongTensor(phones1+phones2).to(device).unsqueeze(0) | |
bert = bert.to(device).unsqueeze(0) | |
all_phoneme_len = torch.tensor([all_phoneme_ids.shape[-1]]).to(device) | |
prompt = prompt_semantic.unsqueeze(0).to(device) | |
t2 = ttime() | |
idx = 0 | |
cnt = 0 | |
while idx == 0 and cnt < 2: | |
with torch.no_grad(): | |
# pred_semantic = t2s_model.model.infer | |
pred_semantic,idx = t2s_model.model.infer_panel( | |
all_phoneme_ids, | |
all_phoneme_len, | |
prompt, | |
bert, | |
# prompt_phone_len=ph_offset, | |
top_k=config['inference']['top_k'], | |
early_stop_num=hz * max_sec) | |
t3 = ttime() | |
cnt+=1 | |
if idx == 0: | |
return "Error: Generation failure: bad zero prediction.", None | |
pred_semantic = pred_semantic[:,-idx:].unsqueeze(0) # .unsqueeze(0)#mq要多unsqueeze一次 | |
refer = get_spepc(hps, ref_wav_path)#.to(device) | |
if(is_half==True):refer=refer.half().to(device) | |
else:refer=refer.to(device) | |
# audio = vq_model.decode(pred_semantic, all_phoneme_ids, refer).detach().cpu().numpy()[0, 0] | |
audio = vq_model.decode(pred_semantic, torch.LongTensor(phones2).to(device).unsqueeze(0), refer).detach().cpu().numpy()[0, 0]###试试重建不带上prompt部分 | |
# direction乘上,变双通道 | |
# 强制0.5 | |
direction = np.array([1, 1]) | |
audio = np.expand_dims(audio, 0) * direction[:, np.newaxis] | |
audio_opt.append(audio) | |
audio_opt.append(zero_wav) | |
t4 = ttime() | |
audio = (hps.data.sampling_rate,(np.concatenate(audio_opt, axis=1)*32768).astype(np.int16).T) | |
prefix_1 = prompt_text[:8].replace(" ", "_").replace("\n", "_").replace("?","_").replace("!","_").replace(",","_") | |
prefix_2 = target_text[:8].replace(" ", "_").replace("\n", "_").replace("?","_").replace("!","_").replace(",","_") | |
filename = tempfile.mktemp(suffix=".wav",prefix=f"{prefix_1}_{prefix_2}_") | |
#audiosegment.from_numpy_array(audio[1].T, framerate=audio[0]).export(filename, format="WAV") | |
wavwrite(filename, audio[0], audio[1]) | |
return "Success", audio, filename | |
return tts_fn | |
def get_str_list_from_phone(text, text_language): | |
# raw文本过g2p得到音素列表,再转成字符串 | |
# 注意,这里的text是一个段落,可能包含多个句子 | |
# 段落间\n分割,音素间空格分割 | |
print(text) | |
texts=text.split("\n") | |
phone_list = [] | |
for text in texts: | |
phones2, word2ph2, norm_text2 = clean_text(text, text_language) | |
phone_list.append(" ".join(phones2)) | |
return "\n".join(phone_list) | |
def get_phone_from_str_list(str_list:str, language:str = 'ja'): | |
# 从音素字符串中得到音素列表 | |
# 注意,这里的text是一个段落,可能包含多个句子 | |
# 段落间\n分割,音素间空格分割 | |
sentences = str_list.split("\n") | |
phones = [] | |
for sentence in sentences: | |
phones.append(sentence.split(" ")) | |
return phones | |
splits={",","。","?","!",",",".","?","!","~",":",":","—","…",}#不考虑省略号 | |
def split(todo_text): | |
todo_text = todo_text.replace("……", "。").replace("——", ",") | |
if (todo_text[-1] not in splits): todo_text += "。" | |
i_split_head = i_split_tail = 0 | |
len_text = len(todo_text) | |
todo_texts = [] | |
while (1): | |
if (i_split_head >= len_text): break # 结尾一定有标点,所以直接跳出即可,最后一段在上次已加入 | |
if (todo_text[i_split_head] in splits): | |
i_split_head += 1 | |
todo_texts.append(todo_text[i_split_tail:i_split_head]) | |
i_split_tail = i_split_head | |
else: | |
i_split_head += 1 | |
return todo_texts | |
def change_reference_audio(prompt_text, transcripts): | |
return transcripts[prompt_text] | |
models = [] | |
models_info = json.load(open("./models/models_info.json", "r", encoding="utf-8")) | |
for i, info in models_info.items(): | |
title = info['title'] | |
cover = info['cover'] | |
gpt_weight = info['gpt_weight'] | |
sovits_weight = info['sovits_weight'] | |
example_reference = info['example_reference'] | |
transcripts = {} | |
transcript_path = info["transcript_path"] | |
path = os.path.dirname(transcript_path) | |
with open(transcript_path, 'r', encoding='utf-8') as file: | |
for line in file: | |
line = line.strip().replace("\\", "/") | |
items = line.split("|") | |
wav,t = items[0], items[-1] | |
wav = os.path.basename(wav) | |
transcripts[t] = os.path.join(os.path.join(path,"reference_audio"), wav) | |
vq_model, ssl_model, t2s_model, hps, config, hz, max_sec = load_model(sovits_weight, gpt_weight) | |
models.append( | |
( | |
i, | |
title, | |
cover, | |
transcripts, | |
example_reference, | |
create_tts_fn( | |
vq_model, ssl_model, t2s_model, hps, config, hz, max_sec | |
) | |
) | |
) | |
with gr.Blocks() as app: | |
gr.Markdown( | |
"# <center> GPT-SoVITS Demo\n" | |
) | |
with gr.Tabs(): | |
for (name, title, cover, transcripts, example_reference, tts_fn) in models: | |
with gr.TabItem(name): | |
with gr.Row(): | |
gr.Markdown( | |
'<div align="center">' | |
f'<a><strong>{title}</strong></a>' | |
'</div>') | |
with gr.Row(): | |
with gr.Column(): | |
prompt_text = gr.Dropdown( | |
label="Transcript of the Reference Audio", | |
value=example_reference if example_reference in transcripts else list(transcripts.keys())[0], | |
choices=list(transcripts.keys()) | |
) | |
inp_ref_audio = gr.Audio( | |
label="Reference Audio", | |
type="filepath", | |
interactive=False, | |
value=transcripts[example_reference] if example_reference in transcripts else list(transcripts.values())[0] | |
) | |
transcripts_state = gr.State(value=transcripts) | |
prompt_text.change( | |
fn=change_reference_audio, | |
inputs=[prompt_text, transcripts_state], | |
outputs=[inp_ref_audio] | |
) | |
prompt_language = gr.State(value="ja") | |
with gr.Column(): | |
text = gr.Textbox(label="Input Text", value="私はお兄ちゃんのだいだいだーいすきな妹なんだから、言うことなんでも聞いてくれますよね!") | |
text_language = gr.Dropdown( | |
label="Language", | |
choices=["ja"], | |
value="ja" | |
) | |
clean_button = gr.Button("Clean Text", variant="primary") | |
inference_button = gr.Button("Generate", variant="primary") | |
cleaned_text = gr.Textbox(label="Cleaned Text") | |
output = gr.Audio(label="Output Audio") | |
output_file = gr.File(label="Output Audio File") | |
om = gr.Textbox(label="Output Message") | |
clean_button.click( | |
fn=get_str_list_from_phone, | |
inputs=[text, text_language], | |
outputs=[cleaned_text] | |
) | |
inference_button.click( | |
fn=tts_fn, | |
inputs=[inp_ref_audio, prompt_text, prompt_language, cleaned_text, text_language, text], | |
outputs=[om, output, output_file] | |
) | |
app.launch(share=True) |