Applio13 / rvc /infer /infer.py
Aitron Emper
Upload 74 files
1a7d583 verified
raw
history blame
9.79 kB
import os
import sys
import time
import torch
import logging
import numpy as np
import soundfile as sf
import librosa
now_dir = os.getcwd()
sys.path.append(now_dir)
from rvc.infer.pipeline import VC
from scipy.io import wavfile
import noisereduce as nr
from rvc.lib.utils import load_audio
from rvc.lib.tools.split_audio import process_audio, merge_audio
from fairseq import checkpoint_utils
from rvc.lib.infer_pack.models import (
SynthesizerTrnMs256NSFsid,
SynthesizerTrnMs256NSFsid_nono,
SynthesizerTrnMs768NSFsid,
SynthesizerTrnMs768NSFsid_nono,
)
from rvc.configs.config import Config
logging.getLogger("fairseq").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
config = Config()
hubert_model = None
tgt_sr = None
net_g = None
vc = None
cpt = None
version = None
n_spk = None
def load_hubert():
global hubert_model
models, _, _ = checkpoint_utils.load_model_ensemble_and_task(
["hubert_base.pt"],
suffix="",
)
hubert_model = models[0]
hubert_model = hubert_model.to(config.device)
if config.is_half:
hubert_model = hubert_model.half()
else:
hubert_model = hubert_model.float()
hubert_model.eval()
def remove_audio_noise(input_audio_path, reduction_strength=0.7):
try:
rate, data = wavfile.read(input_audio_path)
reduced_noise = nr.reduce_noise(
y=data,
sr=rate,
prop_decrease=reduction_strength,
)
return reduced_noise
except Exception as error:
print(f"Error cleaning audio: {error}")
return None
def convert_audio_format(input_path, output_path, output_format):
try:
if output_format != "WAV":
print(f"Converting audio to {output_format} format...")
audio, sample_rate = librosa.load(input_path, sr=None)
common_sample_rates = [
8000,
11025,
12000,
16000,
22050,
24000,
32000,
44100,
48000,
]
target_sr = min(common_sample_rates, key=lambda x: abs(x - sample_rate))
audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=target_sr)
sf.write(output_path, audio, target_sr, format=output_format.lower())
return output_path
except Exception as error:
print(f"Failed to convert audio to {output_format} format: {error}")
def vc_single(
sid=0,
input_audio_path=None,
f0_up_key=None,
f0_file=None,
f0_method=None,
file_index=None,
index_rate=None,
resample_sr=0,
rms_mix_rate=None,
protect=None,
hop_length=None,
output_path=None,
split_audio=False,
f0autotune=False,
filter_radius=None,
):
global tgt_sr, net_g, vc, hubert_model, version
f0_up_key = int(f0_up_key)
try:
audio = load_audio(input_audio_path, 16000)
audio_max = np.abs(audio).max() / 0.95
if audio_max > 1:
audio /= audio_max
if not hubert_model:
load_hubert()
if_f0 = cpt.get("f0", 1)
file_index = (
file_index.strip(" ")
.strip('"')
.strip("\n")
.strip('"')
.strip(" ")
.replace("trained", "added")
)
if tgt_sr != resample_sr >= 16000:
tgt_sr = resample_sr
if split_audio == "True":
result, new_dir_path = process_audio(input_audio_path)
if result == "Error":
return "Error with Split Audio", None
dir_path = (
new_dir_path.strip(" ").strip('"').strip("\n").strip('"').strip(" ")
)
if dir_path != "":
paths = [
os.path.join(root, name)
for root, _, files in os.walk(dir_path, topdown=False)
for name in files
if name.endswith(".wav") and root == dir_path
]
try:
for path in paths:
vc_single(
sid,
path,
f0_up_key,
None,
f0_method,
file_index,
index_rate,
resample_sr,
rms_mix_rate,
protect,
hop_length,
path,
False,
f0autotune,
)
except Exception as error:
print(error)
return f"Error {error}"
print("Finished processing segmented audio, now merging audio...")
merge_timestamps_file = os.path.join(
os.path.dirname(new_dir_path),
f"{os.path.basename(input_audio_path).split('.')[0]}_timestamps.txt",
)
tgt_sr, audio_opt = merge_audio(merge_timestamps_file)
os.remove(merge_timestamps_file)
else:
audio_opt = vc.pipeline(
hubert_model,
net_g,
sid,
audio,
input_audio_path,
f0_up_key,
f0_method,
file_index,
index_rate,
if_f0,
filter_radius,
tgt_sr,
resample_sr,
rms_mix_rate,
version,
protect,
hop_length,
f0autotune,
f0_file=f0_file,
)
if output_path is not None:
sf.write(output_path, audio_opt, tgt_sr, format="WAV")
return (tgt_sr, audio_opt)
except Exception as error:
print(error)
def get_vc(weight_root, sid):
global n_spk, tgt_sr, net_g, vc, cpt, version
if sid == "" or sid == []:
global hubert_model
if hubert_model is not None:
print("clean_empty_cache")
del net_g, n_spk, vc, hubert_model, tgt_sr
hubert_model = net_g = n_spk = vc = hubert_model = tgt_sr = None
if torch.cuda.is_available():
torch.cuda.empty_cache()
if_f0 = cpt.get("f0", 1)
version = cpt.get("version", "v1")
if version == "v1":
if if_f0 == 1:
net_g = SynthesizerTrnMs256NSFsid(
*cpt["config"], is_half=config.is_half
)
else:
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"])
elif version == "v2":
if if_f0 == 1:
net_g = SynthesizerTrnMs768NSFsid(
*cpt["config"], is_half=config.is_half
)
else:
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"])
del net_g, cpt
if torch.cuda.is_available():
torch.cuda.empty_cache()
cpt = None
person = weight_root
cpt = torch.load(person, map_location="cpu")
tgt_sr = cpt["config"][-1]
cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0]
if_f0 = cpt.get("f0", 1)
version = cpt.get("version", "v1")
if version == "v1":
if if_f0 == 1:
net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half)
else:
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"])
elif version == "v2":
if if_f0 == 1:
net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half)
else:
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"])
del net_g.enc_q
print(net_g.load_state_dict(cpt["weight"], strict=False))
net_g.eval().to(config.device)
if config.is_half:
net_g = net_g.half()
else:
net_g = net_g.float()
vc = VC(tgt_sr, config)
n_spk = cpt["config"][-3]
def infer_pipeline(
f0up_key,
filter_radius,
index_rate,
rms_mix_rate,
protect,
hop_length,
f0method,
audio_input_path,
audio_output_path,
model_path,
index_path,
split_audio,
f0autotune,
clean_audio,
clean_strength,
export_format,
):
global tgt_sr, net_g, vc, cpt
get_vc(model_path, 0)
try:
start_time = time.time()
vc_single(
sid=0,
input_audio_path=audio_input_path,
f0_up_key=f0up_key,
f0_file=None,
f0_method=f0method,
file_index=index_path,
index_rate=index_rate,
rms_mix_rate=rms_mix_rate,
protect=protect,
hop_length=hop_length,
output_path=audio_output_path,
split_audio=split_audio,
f0autotune=f0autotune,
filter_radius=filter_radius,
)
if clean_audio == "True":
cleaned_audio = remove_audio_noise(audio_output_path, clean_strength)
if cleaned_audio is not None:
sf.write(audio_output_path, cleaned_audio, tgt_sr, format="WAV")
output_path_format = audio_output_path.replace(
".wav", f".{export_format.lower()}"
)
audio_output_path = convert_audio_format(
audio_output_path, output_path_format, export_format
)
end_time = time.time()
elapsed_time = end_time - start_time
print(
f"Conversion completed. Output file: '{audio_output_path}' in {elapsed_time:.2f} seconds."
)
except Exception as error:
print(f"Voice conversion failed: {error}")