import os import sys import time import torch import librosa import logging import traceback import numpy as np import soundfile as sf import noisereduce as nr from pedalboard import ( Pedalboard, Chorus, Distortion, Reverb, PitchShift, Limiter, Gain, Bitcrush, Clipping, Compressor, Delay, ) now_dir = os.getcwd() sys.path.append(now_dir) from rvc.infer.pipeline import Pipeline as VC from rvc.lib.utils import load_audio_infer, load_embedding from rvc.lib.tools.split_audio import process_audio, merge_audio from rvc.lib.algorithm.synthesizers import Synthesizer from rvc.configs.config import Config logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("faiss").setLevel(logging.WARNING) logging.getLogger("faiss.loader").setLevel(logging.WARNING) class VoiceConverter: """ A class for performing voice conversion using the Retrieval-Based Voice Conversion (RVC) method. """ def __init__(self): """ Initializes the VoiceConverter with default configuration, and sets up models and parameters. """ self.config = Config() # Load RVC configuration self.hubert_model = ( None # Initialize the Hubert model (for embedding extraction) ) self.last_embedder_model = None # Last used embedder model self.tgt_sr = None # Target sampling rate for the output audio self.net_g = None # Generator network for voice conversion self.vc = None # Voice conversion pipeline instance self.cpt = None # Checkpoint for loading model weights self.version = None # Model version self.n_spk = None # Number of speakers in the model self.use_f0 = None # Whether the model uses F0 self.loaded_model = None def load_hubert(self, embedder_model: str, embedder_model_custom: str = None): """ Loads the HuBERT model for speaker embedding extraction. Args: embedder_model (str): Path to the pre-trained HuBERT model. embedder_model_custom (str): Path to the custom HuBERT model. """ self.hubert_model = load_embedding(embedder_model, embedder_model_custom) self.hubert_model.to(self.config.device) self.hubert_model = ( self.hubert_model.half() if self.config.is_half else self.hubert_model.float() ) self.hubert_model.eval() @staticmethod def remove_audio_noise(data, sr, reduction_strength=0.7): """ Removes noise from an audio file using the NoiseReduce library. Args: data (numpy.ndarray): The audio data as a NumPy array. sr (int): The sample rate of the audio data. reduction_strength (float): Strength of the noise reduction. Default is 0.7. """ try: reduced_noise = nr.reduce_noise( y=data, sr=sr, prop_decrease=reduction_strength ) return reduced_noise except Exception as error: print(f"An error occurred removing audio noise: {error}") return None @staticmethod def convert_audio_format(input_path, output_path, output_format): """ Converts an audio file to a specified output format. Args: input_path (str): Path to the input audio file. output_path (str): Path to the output audio file. output_format (str): Desired audio format (e.g., "WAV", "MP3"). """ try: if output_format != "WAV": print(f"Converting audio to {output_format} format...") audio, sample_rate = librosa.load(input_path, sr=None) common_sample_rates = [ 8000, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000, ] target_sr = min(common_sample_rates, key=lambda x: abs(x - sample_rate)) audio = librosa.resample( audio, orig_sr=sample_rate, target_sr=target_sr ) sf.write(output_path, audio, target_sr, format=output_format.lower()) return output_path except Exception as error: print(f"An error occurred converting the audio format: {error}") @staticmethod def post_process_audio( audio_input, sample_rate, **kwargs, ): board = Pedalboard() if kwargs.get("reverb", False): reverb = Reverb( room_size=kwargs.get("reverb_room_size", 0.5), damping=kwargs.get("reverb_damping", 0.5), wet_level=kwargs.get("reverb_wet_level", 0.33), dry_level=kwargs.get("reverb_dry_level", 0.4), width=kwargs.get("reverb_width", 1.0), freeze_mode=kwargs.get("reverb_freeze_mode", 0), ) board.append(reverb) if kwargs.get("pitch_shift", False): pitch_shift = PitchShift(semitones=kwargs.get("pitch_shift_semitones", 0)) board.append(pitch_shift) if kwargs.get("limiter", False): limiter = Limiter( threshold_db=kwargs.get("limiter_threshold", -6), release_ms=kwargs.get("limiter_release", 0.05), ) board.append(limiter) if kwargs.get("gain", False): gain = Gain(gain_db=kwargs.get("gain_db", 0)) board.append(gain) if kwargs.get("distortion", False): distortion = Distortion(drive_db=kwargs.get("distortion_gain", 25)) board.append(distortion) if kwargs.get("chorus", False): chorus = Chorus( rate_hz=kwargs.get("chorus_rate", 1.0), depth=kwargs.get("chorus_depth", 0.25), centre_delay_ms=kwargs.get("chorus_delay", 7), feedback=kwargs.get("chorus_feedback", 0.0), mix=kwargs.get("chorus_mix", 0.5), ) board.append(chorus) if kwargs.get("bitcrush", False): bitcrush = Bitcrush(bit_depth=kwargs.get("bitcrush_bit_depth", 8)) board.append(bitcrush) if kwargs.get("clipping", False): clipping = Clipping(threshold_db=kwargs.get("clipping_threshold", 0)) board.append(clipping) if kwargs.get("compressor", False): compressor = Compressor( threshold_db=kwargs.get("compressor_threshold", 0), ratio=kwargs.get("compressor_ratio", 1), attack_ms=kwargs.get("compressor_attack", 1.0), release_ms=kwargs.get("compressor_release", 100), ) board.append(compressor) if kwargs.get("delay", False): delay = Delay( delay_seconds=kwargs.get("delay_seconds", 0.5), feedback=kwargs.get("delay_feedback", 0.0), mix=kwargs.get("delay_mix", 0.5), ) board.append(delay) return board(audio_input, sample_rate) def convert_audio( self, audio_input_path: str, audio_output_path: str, model_path: str, index_path: str, pitch: int = 0, f0_file: str = None, f0_method: str = "rmvpe", index_rate: float = 0.75, volume_envelope: float = 1, protect: float = 0.5, hop_length: int = 128, split_audio: bool = False, f0_autotune: bool = False, f0_autotune_strength: float = 1, filter_radius: int = 3, embedder_model: str = "contentvec", embedder_model_custom: str = None, clean_audio: bool = False, clean_strength: float = 0.5, export_format: str = "WAV", upscale_audio: bool = False, post_process: bool = False, resample_sr: int = 0, sid: int = 0, **kwargs, ): """ Performs voice conversion on the input audio. Args: pitch (int): Key for F0 up-sampling. filter_radius (int): Radius for filtering. index_rate (float): Rate for index matching. volume_envelope (int): RMS mix rate. protect (float): Protection rate for certain audio segments. hop_length (int): Hop length for audio processing. f0_method (str): Method for F0 extraction. audio_input_path (str): Path to the input audio file. audio_output_path (str): Path to the output audio file. model_path (str): Path to the voice conversion model. index_path (str): Path to the index file. split_audio (bool): Whether to split the audio for processing. f0_autotune (bool): Whether to use F0 autotune. clean_audio (bool): Whether to clean the audio. clean_strength (float): Strength of the audio cleaning. export_format (str): Format for exporting the audio. upscale_audio (bool): Whether to upscale the audio. f0_file (str): Path to the F0 file. embedder_model (str): Path to the embedder model. embedder_model_custom (str): Path to the custom embedder model. resample_sr (int, optional): Resample sampling rate. Default is 0. sid (int, optional): Speaker ID. Default is 0. **kwargs: Additional keyword arguments. """ self.get_vc(model_path, sid) try: start_time = time.time() print(f"Converting audio '{audio_input_path}'...") if upscale_audio == True: from audio_upscaler import upscale upscale(audio_input_path, audio_input_path) audio = load_audio_infer( audio_input_path, 16000, **kwargs, ) audio_max = np.abs(audio).max() / 0.95 if audio_max > 1: audio /= audio_max if not self.hubert_model or embedder_model != self.last_embedder_model: self.load_hubert(embedder_model, embedder_model_custom) self.last_embedder_model = embedder_model file_index = ( index_path.strip() .strip('"') .strip("\n") .strip('"') .strip() .replace("trained", "added") ) if self.tgt_sr != resample_sr >= 16000: self.tgt_sr = resample_sr if split_audio: chunks, intervals = process_audio(audio, 16000) print(f"Audio split into {len(chunks)} chunks for processing.") else: chunks = [] chunks.append(audio) converted_chunks = [] for c in chunks: audio_opt = self.vc.pipeline( model=self.hubert_model, net_g=self.net_g, sid=sid, audio=c, pitch=pitch, f0_method=f0_method, file_index=file_index, index_rate=index_rate, pitch_guidance=self.use_f0, filter_radius=filter_radius, volume_envelope=volume_envelope, version=self.version, protect=protect, hop_length=hop_length, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, f0_file=f0_file, ) converted_chunks.append(audio_opt) if split_audio: print(f"Converted audio chunk {len(converted_chunks)}") if split_audio: audio_opt = merge_audio(converted_chunks, intervals, 16000, self.tgt_sr) else: audio_opt = converted_chunks[0] if clean_audio: cleaned_audio = self.remove_audio_noise( audio_opt, self.tgt_sr, clean_strength ) if cleaned_audio is not None: audio_opt = cleaned_audio if post_process: audio_opt = self.post_process_audio( audio_input=audio_opt, sample_rate=self.tgt_sr, **kwargs, ) sf.write(audio_output_path, audio_opt, self.tgt_sr, format="WAV") output_path_format = audio_output_path.replace( ".wav", f".{export_format.lower()}" ) audio_output_path = self.convert_audio_format( audio_output_path, output_path_format, export_format ) elapsed_time = time.time() - start_time print( f"Conversion completed at '{audio_output_path}' in {elapsed_time:.2f} seconds." ) except Exception as error: print(f"An error occurred during audio conversion: {error}") print(traceback.format_exc()) def convert_audio_batch( self, audio_input_paths: str, audio_output_path: str, **kwargs, ): """ Performs voice conversion on a batch of input audio files. Args: audio_input_paths (str): List of paths to the input audio files. audio_output_path (str): Path to the output audio file. resample_sr (int, optional): Resample sampling rate. Default is 0. sid (int, optional): Speaker ID. Default is 0. **kwargs: Additional keyword arguments. """ pid = os.getpid() try: with open( os.path.join(now_dir, "assets", "infer_pid.txt"), "w" ) as pid_file: pid_file.write(str(pid)) start_time = time.time() print(f"Converting audio batch '{audio_input_paths}'...") audio_files = [ f for f in os.listdir(audio_input_paths) if f.endswith( ( "wav", "mp3", "flac", "ogg", "opus", "m4a", "mp4", "aac", "alac", "wma", "aiff", "webm", "ac3", ) ) ] print(f"Detected {len(audio_files)} audio files for inference.") for a in audio_files: new_input = os.path.join(audio_input_paths, a) new_output = os.path.splitext(a)[0] + "_output.wav" new_output = os.path.join(audio_output_path, new_output) if os.path.exists(new_output): continue self.convert_audio( audio_input_path=new_input, audio_output_path=new_output, **kwargs, ) print(f"Conversion completed at '{audio_input_paths}'.") elapsed_time = time.time() - start_time print(f"Batch conversion completed in {elapsed_time:.2f} seconds.") except Exception as error: print(f"An error occurred during audio batch conversion: {error}") print(traceback.format_exc()) finally: os.remove(os.path.join(now_dir, "assets", "infer_pid.txt")) def get_vc(self, weight_root, sid): """ Loads the voice conversion model and sets up the pipeline. Args: weight_root (str): Path to the model weights. sid (int): Speaker ID. """ if sid == "" or sid == []: self.cleanup_model() if torch.cuda.is_available(): torch.cuda.empty_cache() if not self.loaded_model or self.loaded_model != weight_root: self.load_model(weight_root) if self.cpt is not None: self.setup_network() self.setup_vc_instance() self.loaded_model = weight_root def cleanup_model(self): """ Cleans up the model and releases resources. """ if self.hubert_model is not None: del self.net_g, self.n_spk, self.vc, self.hubert_model, self.tgt_sr self.hubert_model = self.net_g = self.n_spk = self.vc = self.tgt_sr = None if torch.cuda.is_available(): torch.cuda.empty_cache() del self.net_g, self.cpt if torch.cuda.is_available(): torch.cuda.empty_cache() self.cpt = None def load_model(self, weight_root): """ Loads the model weights from the specified path. Args: weight_root (str): Path to the model weights. """ self.cpt = ( torch.load(weight_root, map_location="cpu") if os.path.isfile(weight_root) else None ) def setup_network(self): """ Sets up the network configuration based on the loaded checkpoint. """ if self.cpt is not None: self.tgt_sr = self.cpt["config"][-1] self.cpt["config"][-3] = self.cpt["weight"]["emb_g.weight"].shape[0] self.use_f0 = self.cpt.get("f0", 1) self.version = self.cpt.get("version", "v1") self.text_enc_hidden_dim = 768 if self.version == "v2" else 256 self.net_g = Synthesizer( *self.cpt["config"], use_f0=self.use_f0, text_enc_hidden_dim=self.text_enc_hidden_dim, is_half=self.config.is_half, ) del self.net_g.enc_q self.net_g.load_state_dict(self.cpt["weight"], strict=False) self.net_g.eval().to(self.config.device) self.net_g = ( self.net_g.half() if self.config.is_half else self.net_g.float() ) def setup_vc_instance(self): """ Sets up the voice conversion pipeline instance based on the target sampling rate and configuration. """ if self.cpt is not None: self.vc = VC(self.tgt_sr, self.config) self.n_spk = self.cpt["config"][-3]