import torch import soundfile as sf import numpy as np import argparse import os import yaml import julius import sys currentdir = os.path.dirname(os.path.realpath(__file__)) sys.path.append(os.path.dirname(currentdir)) from networks import Dasp_Mastering_Style_Transfer, Effects_Encoder from modules.loss import AudioFeatureLoss, Loss def convert_audio(wav: torch.Tensor, from_rate: float, to_rate: float, to_channels: int) -> torch.Tensor: """Convert audio to new sample rate and number of audio channels. """ wav = julius.resample_frac(wav, int(from_rate), int(to_rate)) wav = convert_audio_channels(wav, to_channels) return wav class MasteringStyleTransfer: def __init__(self, args): self.args = args self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Load models self.effects_encoder = self.load_effects_encoder() self.mastering_converter = self.load_mastering_converter() def load_effects_encoder(self): effects_encoder = Effects_Encoder(self.args.cfg_enc) reload_weights(effects_encoder, self.args.encoder_path, self.device) effects_encoder.to(self.device) effects_encoder.eval() return effects_encoder def load_mastering_converter(self): mastering_converter = Dasp_Mastering_Style_Transfer(num_features=2048, sample_rate=self.args.sample_rate, tgt_fx_names=['eq', 'distortion', 'multiband_comp', 'gain', 'imager', 'limiter'], model_type='tcn', config=self.args.cfg_converter, batch_size=1) reload_weights(mastering_converter, self.args.model_path, self.device) mastering_converter.to(self.device) mastering_converter.eval() return mastering_converter def get_reference_embedding(self, reference_tensor): with torch.no_grad(): reference_feature = self.effects_encoder(reference_tensor) return reference_feature def mastering_style_transfer(self, input_tensor, reference_feature): with torch.no_grad(): output_audio = self.mastering_converter(input_tensor, reference_feature) predicted_params = self.mastering_converter.get_last_predicted_params() return output_audio, predicted_params # def inference_time_optimization(self, input_tensor, reference_tensor, ito_config, initial_reference_feature): # fit_embedding = torch.nn.Parameter(initial_reference_feature) # optimizer = getattr(torch.optim, ito_config['optimizer'])([fit_embedding], lr=ito_config['learning_rate']) # af_loss = AudioFeatureLoss( # weights=ito_config['af_weights'], # sample_rate=ito_config['sample_rate'], # stem_separation=False, # use_clap=False # ) # min_loss = float('inf') # min_loss_step = 0 # min_loss_output = None # min_loss_params = None # min_loss_embedding = None # loss_history = [] # divergence_counter = 0 # ito_log = [] # for step in range(ito_config['num_steps']): # optimizer.zero_grad() # output_audio = self.mastering_converter(input_tensor, fit_embedding) # current_params = self.mastering_converter.get_last_predicted_params() # losses = af_loss(output_audio, reference_tensor) # total_loss = sum(losses.values()) # loss_history.append(total_loss.item()) # if total_loss < min_loss: # min_loss = total_loss.item() # min_loss_step = step # min_loss_output = output_audio.detach() # min_loss_params = current_params # min_loss_embedding = fit_embedding.detach().clone() # # Check for divergence # if len(loss_history) > 10 and total_loss > loss_history[-11]: # divergence_counter += 1 # else: # divergence_counter = 0 # # Log top 5 parameter differences # if step == 0: # initial_params = current_params # top_5_diff = self.get_top_n_diff_string(initial_params, current_params, top_n=5) # log_entry = f"Step {step + 1}\n Loss: {total_loss.item():.4f}\n{top_5_diff}\n" # if divergence_counter >= 10: # print(f"Optimization stopped early due to divergence at step {step}") # break # total_loss.backward() # optimizer.step() # yield log_entry, output_audio.detach(), current_params, step + 1, total_loss.item() # return min_loss_output, min_loss_params, min_loss_embedding, min_loss_step + 1 def inference_time_optimization(self, input_tensor, reference_tensor, ito_config, initial_reference_feature): fit_embedding = torch.nn.Parameter(initial_reference_feature) optimizer = getattr(torch.optim, ito_config['optimizer'])([fit_embedding], lr=ito_config['learning_rate']) af_loss = AudioFeatureLoss( weights=ito_config['af_weights'], sample_rate=ito_config['sample_rate'], stem_separation=False, use_clap=False ) min_loss = float('inf') min_loss_step = 0 all_results = [] for step in range(ito_config['num_steps']): optimizer.zero_grad() output_audio = self.mastering_converter(input_tensor, fit_embedding) current_params = self.mastering_converter.get_last_predicted_params() losses = af_loss(output_audio, reference_tensor) total_loss = sum(losses.values()) if total_loss < min_loss: min_loss = total_loss.item() min_loss_step = step # Log top 5 parameter differences if step == 0: initial_params = current_params top_5_diff = self.get_top_n_diff_string(initial_params, current_params, top_n=5) log_entry = f"Step {step + 1}\n Loss: {total_loss.item():.4f}\n{top_5_diff}\n" all_results.append({ 'step': step + 1, 'loss': total_loss.item(), 'audio': output_audio.detach(), 'params': current_params, 'log': log_entry }) total_loss.backward() optimizer.step() # yield all_results[-1] return all_results, min_loss_step def preprocess_audio(self, audio, target_sample_rate=44100): sample_rate, data = audio # Normalize audio to -1 to 1 range if data.dtype == np.int16: data = data.astype(np.float32) / 32768.0 elif data.dtype == np.float32: data = np.clip(data, -1.0, 1.0) else: raise ValueError(f"Unsupported audio data type: {data.dtype}") # Ensure stereo channels if data.ndim == 1: data = np.stack([data, data]) elif data.ndim == 2: if data.shape[0] == 2: pass # Already in correct shape elif data.shape[1] == 2: data = data.T else: data = np.stack([data[:, 0], data[:, 0]]) # Duplicate mono channel else: raise ValueError(f"Unsupported audio shape: {data.shape}") # Convert to torch tensor data_tensor = torch.FloatTensor(data).unsqueeze(0) # Resample if necessary if sample_rate != target_sample_rate: data_tensor = julius.resample_frac(data_tensor, sample_rate, target_sample_rate) return data_tensor.to(self.device) def process_audio(self, input_audio, reference_audio, ito_reference_audio): input_tensor = self.preprocess_audio(input_audio, self.args.sample_rate) reference_tensor = self.preprocess_audio(reference_audio, self.args.sample_rate) ito_reference_tensor = self.preprocess_audio(ito_reference_audio, self.args.sample_rate) reference_feature = self.get_reference_embedding(reference_tensor) output_audio, predicted_params = self.mastering_style_transfer(input_tensor, reference_feature) return output_audio, predicted_params, self.args.sample_rate def print_predicted_params(self, predicted_params): if predicted_params is None: print("No predicted parameters available.") return print("Predicted Parameters:") for fx_name, fx_params in predicted_params.items(): print(f"\n{fx_name.upper()}:") if isinstance(fx_params, dict): for param_name, param_value in fx_params.items(): if isinstance(param_value, torch.Tensor): param_value = param_value.detach().cpu().numpy() print(f" {param_name}: {param_value}") elif isinstance(fx_params, torch.Tensor): param_value = fx_params.detach().cpu().numpy() print(f" {param_value}") else: print(f" {fx_params}") def get_param_output_string(self, params): if params is None: return "No parameters available" output = [] for fx_name, fx_params in params.items(): output.append(f"{fx_name.upper()}:") if isinstance(fx_params, dict): for param_name, param_value in fx_params.items(): if isinstance(param_value, torch.Tensor): param_value = param_value.item() output.append(f" {param_name}: {param_value:.4f}") elif isinstance(fx_params, torch.Tensor): output.append(f" {fx_params.item():.4f}") else: output.append(f" {fx_params:.4f}") return "\n".join(output) def get_top_n_diff_string(self, initial_params, ito_params, top_n=5): if initial_params is None or ito_params is None: return "Cannot compare parameters" all_diffs = [] for fx_name in initial_params.keys(): if isinstance(initial_params[fx_name], dict): for param_name in initial_params[fx_name].keys(): initial_value = initial_params[fx_name][param_name] ito_value = ito_params[fx_name][param_name] param_range = self.mastering_converter.fx_processors[fx_name].param_ranges[param_name] normalized_diff = abs((ito_value - initial_value) / (param_range[1] - param_range[0])) all_diffs.append((fx_name, param_name, initial_value.item(), ito_value.item(), normalized_diff.item())) else: initial_value = initial_params[fx_name] ito_value = ito_params[fx_name] normalized_diff = abs(ito_value - initial_value) all_diffs.append((fx_name, 'width', initial_value.item(), ito_value.item(), normalized_diff.item())) top_diffs = sorted(all_diffs, key=lambda x: x[4], reverse=True)[:top_n] output = [f" Top {top_n} parameter differences (initial / ITO / normalized diff):"] for fx_name, param_name, initial_value, ito_value, normalized_diff in top_diffs: output.append(f" {fx_name.upper()} - {param_name}: {initial_value:.3f} / {ito_value:.3f} / {normalized_diff:.3f}") return "\n".join(output) def reload_weights(model, ckpt_path, device): checkpoint = torch.load(ckpt_path, map_location=device) from collections import OrderedDict new_state_dict = OrderedDict() for k, v in checkpoint["model"].items(): name = k[7:] # remove `module.` new_state_dict[name] = v model.load_state_dict(new_state_dict, strict=False)