jadechoghari's picture
add model
9b9e0ee verified
from email.policy import strict
import torch
import os
import pytorch_lightning as pl
import torch.nn.functional as F
from contextlib import contextmanager
import numpy as np
from qa_mdt.audioldm_train.modules.diffusionmodules.ema import *
from torch.optim.lr_scheduler import LambdaLR
from qa_mdt.audioldm_train.modules.diffusionmodules.model import Encoder, Decoder
from qa_mdt.audioldm_train.modules.diffusionmodules.distributions import (
DiagonalGaussianDistribution,
)
import wandb
from qa_mdt.audioldm_train.utilities.model_util import instantiate_from_config
import soundfile as sf
from qa_mdt.audioldm_train.utilities.model_util import get_vocoder
from qa_mdt.audioldm_train.utilities.tools import synth_one_sample
import itertools
class AutoencoderKL(pl.LightningModule):
def __init__(
self,
ddconfig=None,
lossconfig=None,
batchsize=None,
embed_dim=None,
time_shuffle=1,
subband=1,
sampling_rate=16000,
ckpt_path=None,
reload_from_ckpt=None,
ignore_keys=[],
image_key="fbank",
colorize_nlabels=None,
monitor=None,
base_learning_rate=1e-5,
):
super().__init__()
self.automatic_optimization = False
assert (
"mel_bins" in ddconfig.keys()
), "mel_bins is not specified in the Autoencoder config"
num_mel = ddconfig["mel_bins"]
self.image_key = image_key
self.sampling_rate = sampling_rate
self.encoder = Encoder(**ddconfig)
self.decoder = Decoder(**ddconfig)
self.loss = instantiate_from_config(lossconfig)
self.subband = int(subband)
if self.subband > 1:
print("Use subband decomposition %s" % self.subband)
assert ddconfig["double_z"]
self.quant_conv = torch.nn.Conv2d(2 * ddconfig["z_channels"], 2 * embed_dim, 1)
self.post_quant_conv = torch.nn.Conv2d(embed_dim, ddconfig["z_channels"], 1)
if self.image_key == "fbank":
self.vocoder = get_vocoder(None, "cpu", num_mel)
self.embed_dim = embed_dim
if colorize_nlabels is not None:
assert type(colorize_nlabels) == int
self.register_buffer("colorize", torch.randn(3, colorize_nlabels, 1, 1))
if monitor is not None:
self.monitor = monitor
if ckpt_path is not None:
self.init_from_ckpt(ckpt_path, ignore_keys=ignore_keys)
self.learning_rate = float(base_learning_rate)
print("Initial learning rate %s" % self.learning_rate)
self.time_shuffle = time_shuffle
self.reload_from_ckpt = reload_from_ckpt
self.reloaded = False
self.mean, self.std = None, None
self.feature_cache = None
self.flag_first_run = True
self.train_step = 0
self.logger_save_dir = None
self.logger_exp_name = None
self.logger_exp_group_name = None
if not self.reloaded and self.reload_from_ckpt is not None:
# import pdb
# pdb.set_trace()
print("--> Reload weight of autoencoder from %s" % self.reload_from_ckpt)
checkpoint = torch.load(self.reload_from_ckpt)
load_todo_keys = {}
pretrained_state_dict = checkpoint["state_dict"]
current_state_dict = self.state_dict()
for key in current_state_dict:
if (
key in pretrained_state_dict.keys()
and pretrained_state_dict[key].size()
== current_state_dict[key].size()
):
load_todo_keys[key] = pretrained_state_dict[key]
else:
print("Key %s mismatch during loading, seems fine" % key)
self.load_state_dict(load_todo_keys, strict=False)
self.reloaded = True
else:
print("Train from scratch")
def get_log_dir(self):
return os.path.join(
self.logger_save_dir, self.logger_exp_group_name, self.logger_exp_name
)
def set_log_dir(self, save_dir, exp_group_name, exp_name):
self.logger_save_dir = save_dir
self.logger_exp_name = exp_name
self.logger_exp_group_name = exp_group_name
def init_from_ckpt(self, path, ignore_keys=list()):
sd = torch.load(path, map_location="cpu")["state_dict"]
keys = list(sd.keys())
for k in keys:
for ik in ignore_keys:
if k.startswith(ik):
print("Deleting key {} from state_dict.".format(k))
del sd[k]
self.load_state_dict(sd, strict=False)
print(f"Restored from {path}")
def encode(self, x):
# x = self.time_shuffle_operation(x)
x = self.freq_split_subband(x)
h = self.encoder(x)
moments = self.quant_conv(h)
posterior = DiagonalGaussianDistribution(moments)
return posterior
def decode(self, z):
z = self.post_quant_conv(z)
dec = self.decoder(z)
# bs, ch, shuffled_timesteps, fbins = dec.size()
# dec = self.time_unshuffle_operation(dec, bs, int(ch*shuffled_timesteps), fbins)
dec = self.freq_merge_subband(dec)
return dec
def decode_to_waveform(self, dec):
from qa_mdt.audioldm_train.utilities.model_util import vocoder_infer
if self.image_key == "fbank":
dec = dec.squeeze(1).permute(0, 2, 1)
wav_reconstruction = vocoder_infer(dec, self.vocoder)
elif self.image_key == "stft":
dec = dec.squeeze(1).permute(0, 2, 1)
wav_reconstruction = self.wave_decoder(dec)
return wav_reconstruction
def visualize_latent(self, input):
import matplotlib.pyplot as plt
# for i in range(10):
# zero_input = torch.zeros_like(input) - 11.59
# zero_input[:,:,i * 16: i * 16 + 16,:16] += 13.59
# posterior = self.encode(zero_input)
# latent = posterior.sample()
# avg_latent = torch.mean(latent, dim=1)[0]
# plt.imshow(avg_latent.cpu().detach().numpy().T)
# plt.savefig("%s.png" % i)
# plt.close()
np.save("input.npy", input.cpu().detach().numpy())
# zero_input = torch.zeros_like(input) - 11.59
time_input = input.clone()
time_input[:, :, :, :32] *= 0
time_input[:, :, :, :32] -= 11.59
np.save("time_input.npy", time_input.cpu().detach().numpy())
posterior = self.encode(time_input)
latent = posterior.sample()
np.save("time_latent.npy", latent.cpu().detach().numpy())
avg_latent = torch.mean(latent, dim=1)
for i in range(avg_latent.size(0)):
plt.imshow(avg_latent[i].cpu().detach().numpy().T)
plt.savefig("freq_%s.png" % i)
plt.close()
freq_input = input.clone()
freq_input[:, :, :512, :] *= 0
freq_input[:, :, :512, :] -= 11.59
np.save("freq_input.npy", freq_input.cpu().detach().numpy())
posterior = self.encode(freq_input)
latent = posterior.sample()
np.save("freq_latent.npy", latent.cpu().detach().numpy())
avg_latent = torch.mean(latent, dim=1)
for i in range(avg_latent.size(0)):
plt.imshow(avg_latent[i].cpu().detach().numpy().T)
plt.savefig("time_%s.png" % i)
plt.close()
def forward(self, input, sample_posterior=True):
posterior = self.encode(input)
if sample_posterior:
z = posterior.sample()
else:
z = posterior.mode()
if self.flag_first_run:
print("Latent size: ", z.size())
self.flag_first_run = False
dec = self.decode(z)
return dec, posterior
def get_input(self, batch):
fname, text, label_indices, waveform, stft, fbank = (
batch["fname"],
batch["text"],
batch["label_vector"],
batch["waveform"],
batch["stft"],
batch["log_mel_spec"],
)
# if(self.time_shuffle != 1):
# if(fbank.size(1) % self.time_shuffle != 0):
# pad_len = self.time_shuffle - (fbank.size(1) % self.time_shuffle)
# fbank = torch.nn.functional.pad(fbank, (0,0,0,pad_len))
ret = {}
ret["fbank"], ret["stft"], ret["fname"], ret["waveform"] = (
fbank.unsqueeze(1),
stft.unsqueeze(1),
fname,
waveform.unsqueeze(1),
)
return ret
# def time_shuffle_operation(self, fbank):
# if(self.time_shuffle == 1):
# return fbank
# shuffled_fbank = []
# for i in range(self.time_shuffle):
# shuffled_fbank.append(fbank[:,:, i::self.time_shuffle,:])
# return torch.cat(shuffled_fbank, dim=1)
# def time_unshuffle_operation(self, shuffled_fbank, bs, timesteps, fbins):
# if(self.time_shuffle == 1):
# return shuffled_fbank
# buffer = torch.zeros((bs, 1, timesteps, fbins)).to(shuffled_fbank.device)
# for i in range(self.time_shuffle):
# buffer[:,0,i::self.time_shuffle,:] = shuffled_fbank[:,i,:,:]
# return buffer
def freq_split_subband(self, fbank):
if self.subband == 1 or self.image_key != "stft":
return fbank
bs, ch, tstep, fbins = fbank.size()
assert fbank.size(-1) % self.subband == 0
assert ch == 1
return (
fbank.squeeze(1)
.reshape(bs, tstep, self.subband, fbins // self.subband)
.permute(0, 2, 1, 3)
)
def freq_merge_subband(self, subband_fbank):
if self.subband == 1 or self.image_key != "stft":
return subband_fbank
assert subband_fbank.size(1) == self.subband # Channel dimension
bs, sub_ch, tstep, fbins = subband_fbank.size()
return subband_fbank.permute(0, 2, 1, 3).reshape(bs, tstep, -1).unsqueeze(1)
def training_step(self, batch, batch_idx):
g_opt, d_opt = self.optimizers()
inputs_dict = self.get_input(batch)
inputs = inputs_dict[self.image_key]
waveform = inputs_dict["waveform"]
if batch_idx % 5000 == 0 and self.local_rank == 0:
print("Log train image")
self.log_images(inputs, waveform=waveform)
reconstructions, posterior = self(inputs)
if self.image_key == "stft":
rec_waveform = self.decode_to_waveform(reconstructions)
else:
rec_waveform = None
# train the discriminator
# If working on waveform, inputs is STFT, reconstructions are the waveform
# If working on the melspec, inputs is melspec, reconstruction are also mel spec
discloss, log_dict_disc = self.loss(
inputs=inputs,
reconstructions=reconstructions,
posteriors=posterior,
waveform=waveform,
rec_waveform=rec_waveform,
optimizer_idx=1,
global_step=self.global_step,
last_layer=self.get_last_layer(),
split="train",
)
self.log(
"discloss",
discloss,
prog_bar=True,
logger=True,
on_step=True,
on_epoch=True,
)
self.log_dict(
log_dict_disc, prog_bar=False, logger=True, on_step=True, on_epoch=False
)
d_opt.zero_grad()
self.manual_backward(discloss)
d_opt.step()
self.log(
"train_step",
self.train_step,
prog_bar=False,
logger=False,
on_step=True,
on_epoch=False,
)
self.log(
"global_step",
float(self.global_step),
prog_bar=True,
logger=True,
on_step=True,
on_epoch=False,
)
aeloss, log_dict_ae = self.loss(
inputs=inputs,
reconstructions=reconstructions,
posteriors=posterior,
waveform=waveform,
rec_waveform=rec_waveform,
optimizer_idx=0,
global_step=self.global_step,
last_layer=self.get_last_layer(),
split="train",
)
self.log(
"aeloss",
aeloss,
prog_bar=True,
logger=True,
on_step=True,
on_epoch=False,
)
self.log(
"posterior_std",
torch.mean(posterior.var),
prog_bar=True,
logger=True,
on_step=True,
on_epoch=False,
)
self.log_dict(
log_dict_ae, prog_bar=True, logger=True, on_step=True, on_epoch=False
)
self.train_step += 1
g_opt.zero_grad()
self.manual_backward(aeloss)
g_opt.step()
def validation_step(self, batch, batch_idx):
inputs_dict = self.get_input(batch)
inputs = inputs_dict[self.image_key]
waveform = inputs_dict["waveform"]
if batch_idx <= 3:
print("Log val image")
self.log_images(inputs, train=False, waveform=waveform)
reconstructions, posterior = self(inputs)
if self.image_key == "stft":
rec_waveform = self.decode_to_waveform(reconstructions)
else:
rec_waveform = None
aeloss, log_dict_ae = self.loss(
inputs=inputs,
reconstructions=reconstructions,
posteriors=posterior,
waveform=waveform,
rec_waveform=rec_waveform,
optimizer_idx=0,
global_step=self.global_step,
last_layer=self.get_last_layer(),
split="val",
)
discloss, log_dict_disc = self.loss(
inputs=inputs,
reconstructions=reconstructions,
posteriors=posterior,
waveform=waveform,
rec_waveform=rec_waveform,
optimizer_idx=1,
global_step=self.global_step,
last_layer=self.get_last_layer(),
split="val",
)
self.log_dict(log_dict_ae)
self.log_dict(log_dict_disc)
return self.log_dict
def test_step(self, batch, batch_idx):
inputs_dict = self.get_input(batch)
inputs = inputs_dict[self.image_key]
waveform = inputs_dict["waveform"]
fnames = inputs_dict["fname"]
reconstructions, posterior = self(inputs)
save_path = os.path.join(
self.get_log_dir(), "autoencoder_result_audiocaps", str(self.global_step)
)
if self.image_key == "stft":
wav_prediction = self.decode_to_waveform(reconstructions)
wav_original = waveform
self.save_wave(
wav_prediction, fnames, os.path.join(save_path, "stft_wav_prediction")
)
else:
wav_vocoder_gt, wav_prediction = synth_one_sample(
inputs.squeeze(1),
reconstructions.squeeze(1),
labels="validation",
vocoder=self.vocoder,
)
self.save_wave(
wav_vocoder_gt, fnames, os.path.join(save_path, "fbank_vocoder_gt_wave")
)
self.save_wave(
wav_prediction, fnames, os.path.join(save_path, "fbank_wav_prediction")
)
def save_wave(self, batch_wav, fname, save_dir):
os.makedirs(save_dir, exist_ok=True)
for wav, name in zip(batch_wav, fname):
name = os.path.basename(name)
sf.write(os.path.join(save_dir, name), wav, samplerate=self.sampling_rate)
def configure_optimizers(self):
lr = self.learning_rate
params = (
list(self.encoder.parameters())
+ list(self.decoder.parameters())
+ list(self.quant_conv.parameters())
+ list(self.post_quant_conv.parameters())
)
if self.image_key == "stft":
params += list(self.wave_decoder.parameters())
opt_ae = torch.optim.Adam(params, lr=lr, betas=(0.5, 0.9))
if self.image_key == "fbank":
disc_params = self.loss.discriminator.parameters()
elif self.image_key == "stft":
disc_params = itertools.chain(
self.loss.msd.parameters(), self.loss.mpd.parameters()
)
opt_disc = torch.optim.Adam(disc_params, lr=lr, betas=(0.5, 0.9))
return [opt_ae, opt_disc], []
def get_last_layer(self):
return self.decoder.conv_out.weight
@torch.no_grad()
def log_images(self, batch, train=True, only_inputs=False, waveform=None, **kwargs):
log = dict()
x = batch.to(self.device)
if not only_inputs:
xrec, posterior = self(x)
log["samples"] = self.decode(posterior.sample())
log["reconstructions"] = xrec
log["inputs"] = x
wavs = self._log_img(log, train=train, index=0, waveform=waveform)
return wavs
def _log_img(self, log, train=True, index=0, waveform=None):
images_input = self.tensor2numpy(log["inputs"][index, 0]).T
images_reconstruct = self.tensor2numpy(log["reconstructions"][index, 0]).T
images_samples = self.tensor2numpy(log["samples"][index, 0]).T
if train:
name = "train"
else:
name = "val"
if self.logger is not None:
self.logger.log_image(
"img_%s" % name,
[images_input, images_reconstruct, images_samples],
caption=["input", "reconstruct", "samples"],
)
inputs, reconstructions, samples = (
log["inputs"],
log["reconstructions"],
log["samples"],
)
if self.image_key == "fbank":
wav_original, wav_prediction = synth_one_sample(
inputs[index],
reconstructions[index],
labels="validation",
vocoder=self.vocoder,
)
wav_original, wav_samples = synth_one_sample(
inputs[index], samples[index], labels="validation", vocoder=self.vocoder
)
wav_original, wav_samples, wav_prediction = (
wav_original[0],
wav_samples[0],
wav_prediction[0],
)
elif self.image_key == "stft":
wav_prediction = (
self.decode_to_waveform(reconstructions)[index, 0]
.cpu()
.detach()
.numpy()
)
wav_samples = (
self.decode_to_waveform(samples)[index, 0].cpu().detach().numpy()
)
wav_original = waveform[index, 0].cpu().detach().numpy()
if self.logger is not None:
self.logger.experiment.log(
{
"original_%s"
% name: wandb.Audio(
wav_original, caption="original", sample_rate=self.sampling_rate
),
"reconstruct_%s"
% name: wandb.Audio(
wav_prediction,
caption="reconstruct",
sample_rate=self.sampling_rate,
),
"samples_%s"
% name: wandb.Audio(
wav_samples, caption="samples", sample_rate=self.sampling_rate
),
}
)
return wav_original, wav_prediction, wav_samples
def tensor2numpy(self, tensor):
return tensor.cpu().detach().numpy()
def to_rgb(self, x):
assert self.image_key == "segmentation"
if not hasattr(self, "colorize"):
self.register_buffer("colorize", torch.randn(3, x.shape[1], 1, 1).to(x))
x = F.conv2d(x, weight=self.colorize)
x = 2.0 * (x - x.min()) / (x.max() - x.min()) - 1.0
return x
class IdentityFirstStage(torch.nn.Module):
def __init__(self, *args, vq_interface=False, **kwargs):
self.vq_interface = vq_interface # TODO: Should be true by default but check to not break older stuff
super().__init__()
def encode(self, x, *args, **kwargs):
return x
def decode(self, x, *args, **kwargs):
return x
def quantize(self, x, *args, **kwargs):
if self.vq_interface:
return x, None, [None, None, None]
return x
def forward(self, x, *args, **kwargs):
return x