Make-An-Audio-3 / ldm /data /video_spec_maa2_dataset.py
3v324v23's picture
Add code
a84a65c
raw
history blame
36.2 kB
import csv
import os
import pickle
import sys
import numpy as np
import torch
import random
import math
import librosa
class audio_video_spec_fullset_Dataset(torch.utils.data.Dataset):
# Only Load audio dataset: for training Stage1: Audio Npy Dataset
def __init__(self, split, dataset1, feat_type='clip', transforms=None, sr=22050, duration=10, truncate=220000, fps=21.5, drop=0.0, fix_frames=False, hop_len=256):
super().__init__()
if split == "train":
self.split = "Train"
elif split == "valid" or split == 'test':
self.split = "Test"
# Default params:
self.min_duration = 2
self.sr = sr # 22050
self.duration = duration # 10
self.truncate = truncate # 220000
self.fps = fps
self.fix_frames = fix_frames
self.hop_len = hop_len
self.drop = drop
print("Fix Frames: {}".format(self.fix_frames))
print("Use Drop: {}".format(self.drop))
# Dataset1: (VGGSound)
assert dataset1.dataset_name == "VGGSound"
# spec_dir: spectrogram path
# feat_dir: CAVP feature path
# video_dir: video path
dataset1_spec_dir = os.path.join(dataset1.data_dir, "mel_maa2", "npy")
dataset1_feat_dir = os.path.join(dataset1.data_dir, "cavp")
dataset1_video_dir = os.path.join(dataset1.video_dir, "tmp_vid")
split_txt_path = dataset1.split_txt_path
with open(os.path.join(split_txt_path, '{}.txt'.format(self.split)), "r") as f:
data_list1 = f.readlines()
data_list1 = list(map(lambda x: x.strip(), data_list1))
spec_list1 = list(map(lambda x: os.path.join(dataset1_spec_dir, x) + "_mel.npy", data_list1)) # spec
feat_list1 = list(map(lambda x: os.path.join(dataset1_feat_dir, x) + ".npz", data_list1)) # feat
video_list1 = list(map(lambda x: os.path.join(dataset1_video_dir, x) + "_new_fps_21.5_truncate_0_10.0.mp4", data_list1)) # video
# Merge Data:
self.data_list = data_list1 if self.split != "Test" else data_list1[:200]
self.spec_list = spec_list1 if self.split != "Test" else spec_list1[:200]
self.feat_list = feat_list1 if self.split != "Test" else feat_list1[:200]
self.video_list = video_list1 if self.split != "Test" else video_list1[:200]
assert len(self.data_list) == len(self.spec_list) == len(self.feat_list) == len(self.video_list)
shuffle_idx = np.random.permutation(np.arange(len(self.data_list)))
self.data_list = [self.data_list[i] for i in shuffle_idx]
self.spec_list = [self.spec_list[i] for i in shuffle_idx]
self.feat_list = [self.feat_list[i] for i in shuffle_idx]
self.video_list = [self.video_list[i] for i in shuffle_idx]
print('Split: {} Sample Num: {}'.format(split, len(self.data_list)))
def __len__(self):
return len(self.data_list)
def load_spec_and_feat(self, spec_path, video_feat_path):
"""Load audio spec and video feat"""
try:
spec_raw = np.load(spec_path).astype(np.float32) # channel: 1
except:
print(f"corrupted mel: {spec_path}", flush=True)
spec_raw = np.zeros((80, 625), dtype=np.float32) # [C, T]
p = np.random.uniform(0,1)
if p > self.drop:
try:
video_feat = np.load(video_feat_path)['feat'].astype(np.float32)
except:
print(f"corrupted video: {video_feat_path}", flush=True)
video_feat = np.load(os.path.join(os.path.dirname(video_feat_path), 'empty_vid.npz'))['feat'].astype(np.float32)
else:
video_feat = np.load(os.path.join(os.path.dirname(video_feat_path), 'empty_vid.npz'))['feat'].astype(np.float32)
spec_len = self.sr * self.duration / self.hop_len
if spec_raw.shape[1] < spec_len:
spec_raw = np.tile(spec_raw, math.ceil(spec_len / spec_raw.shape[1]))
spec_raw = spec_raw[:, :int(spec_len)]
feat_len = self.fps * self.duration
if video_feat.shape[0] < feat_len:
video_feat = np.tile(video_feat, (math.ceil(feat_len / video_feat.shape[0]), 1))
video_feat = video_feat[:int(feat_len)]
return spec_raw, video_feat
def mix_audio_and_feat(self, spec1=None, spec2=None, video_feat1=None, video_feat2=None, video_info_dict={}, mode='single'):
""" Return Mix Spec and Mix video feat"""
if mode == "single":
# spec1:
if not self.fix_frames:
start_idx = random.randint(0, self.sr * self.duration - self.truncate - 1) # audio start
else:
start_idx = 0
start_frame = int(self.fps * start_idx / self.sr)
truncate_frame = int(self.fps * self.truncate / self.sr)
# Spec Start & Truncate:
spec_start = int(start_idx / self.hop_len)
spec_truncate = int(self.truncate / self.hop_len)
spec1 = spec1[:, spec_start : spec_start + spec_truncate]
video_feat1 = video_feat1[start_frame: start_frame + truncate_frame]
# info_dict:
video_info_dict['video_time1'] = str(start_frame) + '_' + str(start_frame+truncate_frame) # Start frame, end frame
video_info_dict['video_time2'] = ""
return spec1, video_feat1, video_info_dict
elif mode == "concat":
total_spec_len = int(self.truncate / self.hop_len)
# Random Trucate len:
spec1_truncate_len = random.randint(self.min_duration * self.sr // self.hop_len, total_spec_len - self.min_duration * self.sr // self.hop_len - 1)
spec2_truncate_len = total_spec_len - spec1_truncate_len
# Sample spec clip:
spec_start1 = random.randint(0, total_spec_len - spec1_truncate_len - 1)
spec_start2 = random.randint(0, total_spec_len - spec2_truncate_len - 1)
spec_end1, spec_end2 = spec_start1 + spec1_truncate_len, spec_start2 + spec2_truncate_len
# concat spec:
spec1, spec2 = spec1[:, spec_start1 : spec_end1], spec2[:, spec_start2 : spec_end2]
concat_audio_spec = np.concatenate([spec1, spec2], axis=1)
# Concat Video Feat:
start1_frame, truncate1_frame = int(self.fps * spec_start1 * self.hop_len / self.sr), int(self.fps * spec1_truncate_len * self.hop_len / self.sr)
start2_frame, truncate2_frame = int(self.fps * spec_start2 * self.hop_len / self.sr), int(self.fps * self.truncate / self.sr) - truncate1_frame
video_feat1, video_feat2 = video_feat1[start1_frame : start1_frame + truncate1_frame], video_feat2[start2_frame : start2_frame + truncate2_frame]
concat_video_feat = np.concatenate([video_feat1, video_feat2])
video_info_dict['video_time1'] = str(start1_frame) + '_' + str(start1_frame+truncate1_frame) # Start frame, end frame
video_info_dict['video_time2'] = str(start2_frame) + '_' + str(start2_frame+truncate2_frame)
return concat_audio_spec, concat_video_feat, video_info_dict
def __getitem__(self, idx):
audio_name1 = self.data_list[idx]
spec_npy_path1 = self.spec_list[idx]
video_feat_path1 = self.feat_list[idx]
video_path1 = self.video_list[idx]
# select other video:
flag = False
if random.uniform(0, 1) < 0.5:
flag = True
random_idx = idx
while random_idx == idx:
random_idx = random.randint(0, len(self.data_list)-1)
audio_name2 = self.data_list[random_idx]
spec_npy_path2 = self.spec_list[random_idx]
video_feat_path2 = self.feat_list[random_idx]
video_path2 = self.video_list[random_idx]
# Load the Spec and Feat:
spec1, video_feat1 = self.load_spec_and_feat(spec_npy_path1, video_feat_path1)
if flag:
spec2, video_feat2 = self.load_spec_and_feat(spec_npy_path2, video_feat_path2)
video_info_dict = {'audio_name1':audio_name1, 'audio_name2': audio_name2, 'video_path1': video_path1, 'video_path2': video_path2}
mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(spec1, spec2, video_feat1, video_feat2, video_info_dict, mode='concat')
else:
video_info_dict = {'audio_name1':audio_name1, 'audio_name2': "", 'video_path1': video_path1, 'video_path2': ""}
mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(spec1=spec1, video_feat1=video_feat1, video_info_dict=video_info_dict, mode='single')
# print("mix spec shape:", mix_spec.shape)
# print("mix video feat:", mix_video_feat.shape)
data_dict = {}
# data_dict['mix_spec'] = mix_spec[None].repeat(3, axis=0) # TODO:要把这里改掉,否则无法适应maa的autoencoder
data_dict['mix_spec'] = mix_spec # (80, 512)
data_dict['mix_video_feat'] = mix_video_feat # (32, 512)
data_dict['mix_info_dict'] = mix_info
return data_dict
class audio_video_spec_fullset_Dataset_Train(audio_video_spec_fullset_Dataset):
def __init__(self, dataset_cfg):
super().__init__(split='train', **dataset_cfg)
class audio_video_spec_fullset_Dataset_Valid(audio_video_spec_fullset_Dataset):
def __init__(self, dataset_cfg):
super().__init__(split='valid', **dataset_cfg)
class audio_video_spec_fullset_Dataset_Test(audio_video_spec_fullset_Dataset):
def __init__(self, dataset_cfg):
super().__init__(split='test', **dataset_cfg)
class audio_video_spec_fullset_Dataset_inpaint(audio_video_spec_fullset_Dataset):
def __getitem__(self, idx):
audio_name1 = self.data_list[idx]
spec_npy_path1 = self.spec_list[idx]
video_feat_path1 = self.feat_list[idx]
video_path1 = self.video_list[idx]
# Load the Spec and Feat:
spec1, video_feat1 = self.load_spec_and_feat(spec_npy_path1, video_feat_path1)
video_info_dict = {'audio_name1': audio_name1, 'audio_name2': "", 'video_path1': video_path1, 'video_path2': ""}
mix_spec, mix_masked_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(spec1=spec1, video_feat1=video_feat1, video_info_dict=video_info_dict)
# print("mix spec shape:", mix_spec.shape)
# print("mix video feat:", mix_video_feat.shape)
data_dict = {}
# data_dict['mix_spec'] = mix_spec[None].repeat(3, axis=0) # TODO:要把这里改掉,否则无法适应maa的autoencoder
data_dict['mix_spec'] = mix_spec # (80, 512)
data_dict['hybrid_feat'] = {'mix_video_feat': mix_video_feat, 'mix_spec': mix_masked_spec} # (32, 512)
data_dict['mix_info_dict'] = mix_info
return data_dict
def mix_audio_and_feat(self, spec1=None, video_feat1=None, video_info_dict={}):
""" Return Mix Spec and Mix video feat"""
# spec1:
if not self.fix_frames:
start_idx = random.randint(0, self.sr * self.duration - self.truncate - 1) # audio start
else:
start_idx = 0
start_frame = int(self.fps * start_idx / self.sr)
truncate_frame = int(self.fps * self.truncate / self.sr)
# Spec Start & Truncate:
spec_start = int(start_idx / self.hop_len)
spec_truncate = int(self.truncate / self.hop_len)
spec1 = spec1[:, spec_start: spec_start + spec_truncate]
video_feat1 = video_feat1[start_frame: start_frame + truncate_frame]
# Start masking frames:
masked_spec = random.randint(1, int(spec_truncate * 0.5 // 16)) * 16 # 16帧的倍数,最多mask 50%
masked_truncate = int(masked_spec * self.hop_len)
masked_frame = int(self.fps * masked_truncate / self.sr)
start_masked_idx = random.randint(0, self.truncate - masked_truncate - 1)
start_masked_frame = int(self.fps * start_masked_idx / self.sr)
start_masked_spec = int(start_masked_idx / self.hop_len)
masked_spec1 = np.zeros((80, spec_truncate)).astype(np.float32)
masked_spec1[:] = spec1[:]
masked_spec1[:, start_masked_spec:start_masked_spec+masked_spec] = np.zeros((80, masked_spec))
video_feat1[start_masked_frame:start_masked_frame+masked_frame, :] = np.zeros((masked_frame, 512))
# info_dict:
video_info_dict['video_time1'] = str(start_frame) + '_' + str(start_frame + truncate_frame) # Start frame, end frame
video_info_dict['video_time2'] = ""
return spec1, masked_spec1, video_feat1, video_info_dict
class audio_video_spec_fullset_Dataset_inpaint_Train(audio_video_spec_fullset_Dataset_inpaint):
def __init__(self, dataset_cfg):
super().__init__(split='train', **dataset_cfg)
class audio_video_spec_fullset_Dataset_inpaint_Valid(audio_video_spec_fullset_Dataset_inpaint):
def __init__(self, dataset_cfg):
super().__init__(split='valid', **dataset_cfg)
class audio_video_spec_fullset_Dataset_inpaint_Test(audio_video_spec_fullset_Dataset_inpaint):
def __init__(self, dataset_cfg):
super().__init__(split='test', **dataset_cfg)
class audio_Dataset(torch.utils.data.Dataset):
# Only Load audio dataset: for training Stage1: Audio Npy Dataset
def __init__(self, split, dataset1, sr=22050, duration=10, truncate=220000, debug_num=False, fix_frames=False, hop_len=256):
super().__init__()
if split == "train":
self.split = "Train"
elif split == "valid" or split == 'test':
self.split = "Test"
# Default params:
self.min_duration = 2
self.sr = sr # 22050
self.duration = duration # 10
self.truncate = truncate # 220000
self.fix_frames = fix_frames
self.hop_len = hop_len
print("Fix Frames: {}".format(self.fix_frames))
# Dataset1: (VGGSound)
assert dataset1.dataset_name == "VGGSound"
# spec_dir: spectrogram path
# dataset1_spec_dir = os.path.join(dataset1.data_dir, "codec")
dataset1_wav_dir = os.path.join(dataset1.wav_dir, "wav")
split_txt_path = dataset1.split_txt_path
with open(os.path.join(split_txt_path, '{}.txt'.format(self.split)), "r") as f:
data_list1 = f.readlines()
data_list1 = list(map(lambda x: x.strip(), data_list1))
wav_list1 = list(map(lambda x: os.path.join(dataset1_wav_dir, x) + ".wav", data_list1)) # feat
# Merge Data:
self.data_list = data_list1
self.wav_list = wav_list1
assert len(self.data_list) == len(self.wav_list)
shuffle_idx = np.random.permutation(np.arange(len(self.data_list)))
self.data_list = [self.data_list[i] for i in shuffle_idx]
self.wav_list = [self.wav_list[i] for i in shuffle_idx]
if debug_num:
self.data_list = self.data_list[:debug_num]
self.wav_list = self.wav_list[:debug_num]
print('Split: {} Sample Num: {}'.format(split, len(self.data_list)))
def __len__(self):
return len(self.data_list)
def load_spec_and_feat(self, wav_path):
"""Load audio spec and video feat"""
try:
wav_raw, sr = librosa.load(wav_path, sr=self.sr) # channel: 1
except:
print(f"corrupted wav: {wav_path}", flush=True)
wav_raw = np.zeros((160000,), dtype=np.float32) # [T]
wav_len = self.sr * self.duration
if wav_raw.shape[0] < wav_len:
wav_raw = np.tile(wav_raw, math.ceil(wav_len / wav_raw.shape[0]))
wav_raw = wav_raw[:int(wav_len)]
return wav_raw
def mix_audio_and_feat(self, wav_raw1=None, video_info_dict={}, mode='single'):
""" Return Mix Spec and Mix video feat"""
if mode == "single":
# spec1:
if not self.fix_frames:
start_idx = random.randint(0, self.sr * self.duration - self.truncate - 1) # audio start
else:
start_idx = 0
wav_start = start_idx
wav_truncate = self.truncate
wav_raw1 = wav_raw1[wav_start: wav_start + wav_truncate]
return wav_raw1, video_info_dict
elif mode == "concat":
total_spec_len = int(self.truncate / self.hop_len)
# Random Trucate len:
spec1_truncate_len = random.randint(self.min_duration * self.sr // self.hop_len, total_spec_len - self.min_duration * self.sr // self.hop_len - 1)
spec2_truncate_len = total_spec_len - spec1_truncate_len
# Sample spec clip:
spec_start1 = random.randint(0, total_spec_len - spec1_truncate_len - 1)
spec_start2 = random.randint(0, total_spec_len - spec2_truncate_len - 1)
spec_end1, spec_end2 = spec_start1 + spec1_truncate_len, spec_start2 + spec2_truncate_len
# concat spec:
return video_info_dict
def __getitem__(self, idx):
audio_name1 = self.data_list[idx]
wav_path1 = self.wav_list[idx]
# select other video:
flag = False
if random.uniform(0, 1) < -1:
flag = True
random_idx = idx
while random_idx == idx:
random_idx = random.randint(0, len(self.data_list)-1)
audio_name2 = self.data_list[random_idx]
spec_npy_path2 = self.spec_list[random_idx]
wav_path2 = self.wav_list[random_idx]
# Load the Spec and Feat:
wav_raw1 = self.load_spec_and_feat(wav_path1)
if flag:
spec2, video_feat2 = self.load_spec_and_feat(spec_npy_path2, wav_path2)
video_info_dict = {'audio_name1':audio_name1, 'audio_name2': audio_name2}
mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(video_info_dict, mode='concat')
else:
video_info_dict = {'audio_name1':audio_name1, 'audio_name2': ""}
mix_wav, mix_info = self.mix_audio_and_feat(wav_raw1=wav_raw1, video_info_dict=video_info_dict, mode='single')
data_dict = {}
data_dict['mix_wav'] = mix_wav # (131072,)
data_dict['mix_info_dict'] = mix_info
return data_dict
class audio_Dataset_Train(audio_Dataset):
def __init__(self, dataset_cfg):
super().__init__(split='train', **dataset_cfg)
class audio_Dataset_Test(audio_Dataset):
def __init__(self, dataset_cfg):
super().__init__(split='test', **dataset_cfg)
class audio_Dataset_Valid(audio_Dataset):
def __init__(self, dataset_cfg):
super().__init__(split='valid', **dataset_cfg)
class video_codec_Dataset(torch.utils.data.Dataset):
# Only Load audio dataset: for training Stage1: Audio Npy Dataset
def __init__(self, split, dataset1, sr=22050, duration=10, truncate=220000, fps=21.5, debug_num=False, fix_frames=False, hop_len=256):
super().__init__()
if split == "train":
self.split = "Train"
elif split == "valid" or split == 'test':
self.split = "Test"
# Default params:
self.min_duration = 2
self.fps = fps
self.sr = sr # 22050
self.duration = duration # 10
self.truncate = truncate # 220000
self.fix_frames = fix_frames
self.hop_len = hop_len
print("Fix Frames: {}".format(self.fix_frames))
# Dataset1: (VGGSound)
assert dataset1.dataset_name == "VGGSound"
# spec_dir: spectrogram path
# dataset1_spec_dir = os.path.join(dataset1.data_dir, "codec")
dataset1_feat_dir = os.path.join(dataset1.data_dir, "cavp")
dataset1_wav_dir = os.path.join(dataset1.wav_dir, "wav")
split_txt_path = dataset1.split_txt_path
with open(os.path.join(split_txt_path, '{}.txt'.format(self.split)), "r") as f:
data_list1 = f.readlines()
data_list1 = list(map(lambda x: x.strip(), data_list1))
wav_list1 = list(map(lambda x: os.path.join(dataset1_wav_dir, x) + ".wav", data_list1)) # feat
feat_list1 = list(map(lambda x: os.path.join(dataset1_feat_dir, x) + ".npz", data_list1)) # feat
# Merge Data:
self.data_list = data_list1
self.wav_list = wav_list1
self.feat_list = feat_list1
assert len(self.data_list) == len(self.wav_list)
shuffle_idx = np.random.permutation(np.arange(len(self.data_list)))
self.data_list = [self.data_list[i] for i in shuffle_idx]
self.wav_list = [self.wav_list[i] for i in shuffle_idx]
self.feat_list = [self.feat_list[i] for i in shuffle_idx]
if debug_num:
self.data_list = self.data_list[:debug_num]
self.wav_list = self.wav_list[:debug_num]
self.feat_list = self.feat_list[:debug_num]
print('Split: {} Sample Num: {}'.format(split, len(self.data_list)))
def __len__(self):
return len(self.data_list)
def load_spec_and_feat(self, wav_path, video_feat_path):
"""Load audio spec and video feat"""
try:
wav_raw, sr = librosa.load(wav_path, sr=self.sr) # channel: 1
except:
print(f"corrupted wav: {wav_path}", flush=True)
wav_raw = np.zeros((160000,), dtype=np.float32) # [T]
try:
video_feat = np.load(video_feat_path)['feat'].astype(np.float32)
except:
print(f"corrupted video: {video_feat_path}", flush=True)
video_feat = np.load(os.path.join(os.path.dirname(video_feat_path), 'empty_vid.npz'))['feat'].astype(np.float32)
wav_len = self.sr * self.duration
if wav_raw.shape[0] < wav_len:
wav_raw = np.tile(wav_raw, math.ceil(wav_len / wav_raw.shape[0]))
wav_raw = wav_raw[:int(wav_len)]
feat_len = self.fps * self.duration
if video_feat.shape[0] < feat_len:
video_feat = np.tile(video_feat, (math.ceil(feat_len / video_feat.shape[0]), 1))
video_feat = video_feat[:int(feat_len)]
return wav_raw, video_feat
def mix_audio_and_feat(self, wav_raw1=None, video_feat1=None, video_info_dict={}, mode='single'):
""" Return Mix Spec and Mix video feat"""
if mode == "single":
# spec1:
if not self.fix_frames:
start_idx = random.randint(0, self.sr * self.duration - self.truncate - 1) # audio start
else:
start_idx = 0
wav_start = start_idx
wav_truncate = self.truncate
wav_raw1 = wav_raw1[wav_start: wav_start + wav_truncate]
start_frame = int(self.fps * start_idx / self.sr)
truncate_frame = int(self.fps * self.truncate / self.sr)
video_feat1 = video_feat1[start_frame: start_frame + truncate_frame]
# info_dict:
video_info_dict['video_time1'] = str(start_frame) + '_' + str(start_frame+truncate_frame) # Start frame, end frame
video_info_dict['video_time2'] = ""
return wav_raw1, video_feat1, video_info_dict
elif mode == "concat":
total_spec_len = int(self.truncate / self.hop_len)
# Random Trucate len:
spec1_truncate_len = random.randint(self.min_duration * self.sr // self.hop_len, total_spec_len - self.min_duration * self.sr // self.hop_len - 1)
spec2_truncate_len = total_spec_len - spec1_truncate_len
# Sample spec clip:
spec_start1 = random.randint(0, total_spec_len - spec1_truncate_len - 1)
spec_start2 = random.randint(0, total_spec_len - spec2_truncate_len - 1)
spec_end1, spec_end2 = spec_start1 + spec1_truncate_len, spec_start2 + spec2_truncate_len
# concat spec:
return video_info_dict
def __getitem__(self, idx):
audio_name1 = self.data_list[idx]
wav_path1 = self.wav_list[idx]
video_feat_path1 = self.feat_list[idx]
# select other video:
flag = False
if random.uniform(0, 1) < -1:
flag = True
random_idx = idx
while random_idx == idx:
random_idx = random.randint(0, len(self.data_list)-1)
audio_name2 = self.data_list[random_idx]
wav_path2 = self.wav_list[random_idx]
video_feat_path2 = self.feat_list[random_idx]
# Load the Spec and Feat:
wav_raw1, video_feat1 = self.load_spec_and_feat(wav_path1, video_feat_path1)
if flag:
wav_raw2, video_feat2 = self.load_spec_and_feat(wav_path2, video_feat_path2)
video_info_dict = {'audio_name1':audio_name1, 'audio_name2': audio_name2}
mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(video_info_dict, mode='concat')
else:
video_info_dict = {'audio_name1':audio_name1, 'audio_name2': ""}
mix_wav, mix_video_feat, mix_info = self.mix_audio_and_feat(wav_raw1=wav_raw1, video_feat1=video_feat1, video_info_dict=video_info_dict, mode='single')
data_dict = {}
data_dict['mix_wav'] = mix_wav # (131072,)
data_dict['mix_video_feat'] = mix_video_feat # (32, 512)
data_dict['mix_info_dict'] = mix_info
return data_dict
class video_codec_Dataset_Train(video_codec_Dataset):
def __init__(self, dataset_cfg):
super().__init__(split='train', **dataset_cfg)
class video_codec_Dataset_Test(video_codec_Dataset):
def __init__(self, dataset_cfg):
super().__init__(split='test', **dataset_cfg)
class video_codec_Dataset_Valid(video_codec_Dataset):
def __init__(self, dataset_cfg):
super().__init__(split='valid', **dataset_cfg)
class audio_video_spec_fullset_Audioset_Dataset(torch.utils.data.Dataset):
# Only Load audio dataset: for training Stage1: Audio Npy Dataset
def __init__(self, split, dataset1, dataset2, sr=22050, duration=10, truncate=220000,
fps=21.5, drop=0.0, fix_frames=False, hop_len=256):
super().__init__()
if split == "train":
self.split = "Train"
elif split == "valid" or split == 'test':
self.split = "Test"
# Default params:
self.min_duration = 2
self.sr = sr # 22050
self.duration = duration # 10
self.truncate = truncate # 220000
self.fps = fps
self.fix_frames = fix_frames
self.hop_len = hop_len
self.drop = drop
print("Fix Frames: {}".format(self.fix_frames))
print("Use Drop: {}".format(self.drop))
# Dataset1: (VGGSound)
assert dataset1.dataset_name == "VGGSound"
assert dataset2.dataset_name == "Audioset"
# spec_dir: spectrogram path
# feat_dir: CAVP feature path
# video_dir: video path
dataset1_spec_dir = os.path.join(dataset1.data_dir, "mel_maa2", "npy")
dataset1_feat_dir = os.path.join(dataset1.data_dir, "cavp")
split_txt_path = dataset1.split_txt_path
with open(os.path.join(split_txt_path, '{}.txt'.format(self.split)), "r") as f:
data_list1 = f.readlines()
data_list1 = list(map(lambda x: x.strip(), data_list1))
spec_list1 = list(map(lambda x: os.path.join(dataset1_spec_dir, x) + "_mel.npy", data_list1)) # spec
feat_list1 = list(map(lambda x: os.path.join(dataset1_feat_dir, x) + ".npz", data_list1)) # feat
if split == "train":
dataset2_spec_dir = os.path.join(dataset2.data_dir, "mel")
dataset2_feat_dir = os.path.join(dataset2.data_dir, "cavp_renamed")
split_txt_path = dataset2.split_txt_path
with open(os.path.join(split_txt_path, '{}.txt'.format(self.split)), "r") as f:
data_list2 = f.readlines()
data_list2 = list(map(lambda x: x.strip(), data_list2))
spec_list2 = list(map(lambda x: os.path.join(dataset2_spec_dir, f'Y{x}') + "_mel.npy", data_list2)) # spec
feat_list2 = list(map(lambda x: os.path.join(dataset2_feat_dir, x) + ".npz", data_list2)) # feat
data_list1 += data_list2
spec_list1 += spec_list2
feat_list1 += feat_list2
# Merge Data:
self.data_list = data_list1 if self.split != "Test" else data_list1[:200]
self.spec_list = spec_list1 if self.split != "Test" else spec_list1[:200]
self.feat_list = feat_list1 if self.split != "Test" else feat_list1[:200]
assert len(self.data_list) == len(self.spec_list) == len(self.feat_list)
shuffle_idx = np.random.permutation(np.arange(len(self.data_list)))
self.data_list = [self.data_list[i] for i in shuffle_idx]
self.spec_list = [self.spec_list[i] for i in shuffle_idx]
self.feat_list = [self.feat_list[i] for i in shuffle_idx]
print('Split: {} Sample Num: {}'.format(split, len(self.data_list)))
# self.check(self.spec_list)
def __len__(self):
return len(self.data_list)
def check(self, feat_list):
from tqdm import tqdm
for spec_path in tqdm(feat_list):
mel = np.load(spec_path).astype(np.float32)
if mel.shape[0] != 80:
import ipdb
ipdb.set_trace()
def load_spec_and_feat(self, spec_path, video_feat_path):
"""Load audio spec and video feat"""
spec_raw = np.load(spec_path).astype(np.float32) # channel: 1
if spec_raw.shape[0] != 80:
print(f"corrupted mel: {spec_path}", flush=True)
spec_raw = np.zeros((80, 625), dtype=np.float32) # [C, T]
p = np.random.uniform(0, 1)
if p > self.drop:
try:
video_feat = np.load(video_feat_path)['feat'].astype(np.float32)
except:
print(f"corrupted video: {video_feat_path}", flush=True)
video_feat = np.load(os.path.join(os.path.dirname(video_feat_path), 'empty_vid.npz'))['feat'].astype(np.float32)
else:
video_feat = np.load(os.path.join(os.path.dirname(video_feat_path), 'empty_vid.npz'))['feat'].astype(np.float32)
spec_len = self.sr * self.duration / self.hop_len
if spec_raw.shape[1] < spec_len:
spec_raw = np.tile(spec_raw, math.ceil(spec_len / spec_raw.shape[1]))
spec_raw = spec_raw[:, :int(spec_len)]
feat_len = self.fps * self.duration
if video_feat.shape[0] < feat_len:
video_feat = np.tile(video_feat, (math.ceil(feat_len / video_feat.shape[0]), 1))
video_feat = video_feat[:int(feat_len)]
return spec_raw, video_feat
def mix_audio_and_feat(self, spec1=None, spec2=None, video_feat1=None, video_feat2=None, video_info_dict={},
mode='single'):
""" Return Mix Spec and Mix video feat"""
if mode == "single":
# spec1:
if not self.fix_frames:
start_idx = random.randint(0, self.sr * self.duration - self.truncate - 1) # audio start
else:
start_idx = 0
start_frame = int(self.fps * start_idx / self.sr)
truncate_frame = int(self.fps * self.truncate / self.sr)
# Spec Start & Truncate:
spec_start = int(start_idx / self.hop_len)
spec_truncate = int(self.truncate / self.hop_len)
spec1 = spec1[:, spec_start: spec_start + spec_truncate]
video_feat1 = video_feat1[start_frame: start_frame + truncate_frame]
# info_dict:
video_info_dict['video_time1'] = str(start_frame) + '_' + str(
start_frame + truncate_frame) # Start frame, end frame
video_info_dict['video_time2'] = ""
return spec1, video_feat1, video_info_dict
elif mode == "concat":
total_spec_len = int(self.truncate / self.hop_len)
# Random Trucate len:
spec1_truncate_len = random.randint(self.min_duration * self.sr // self.hop_len,
total_spec_len - self.min_duration * self.sr // self.hop_len - 1)
spec2_truncate_len = total_spec_len - spec1_truncate_len
# Sample spec clip:
spec_start1 = random.randint(0, total_spec_len - spec1_truncate_len - 1)
spec_start2 = random.randint(0, total_spec_len - spec2_truncate_len - 1)
spec_end1, spec_end2 = spec_start1 + spec1_truncate_len, spec_start2 + spec2_truncate_len
# concat spec:
spec1, spec2 = spec1[:, spec_start1: spec_end1], spec2[:, spec_start2: spec_end2]
concat_audio_spec = np.concatenate([spec1, spec2], axis=1)
# Concat Video Feat:
start1_frame, truncate1_frame = int(self.fps * spec_start1 * self.hop_len / self.sr), int(
self.fps * spec1_truncate_len * self.hop_len / self.sr)
start2_frame, truncate2_frame = int(self.fps * spec_start2 * self.hop_len / self.sr), int(
self.fps * self.truncate / self.sr) - truncate1_frame
video_feat1, video_feat2 = video_feat1[start1_frame: start1_frame + truncate1_frame], video_feat2[
start2_frame: start2_frame + truncate2_frame]
concat_video_feat = np.concatenate([video_feat1, video_feat2])
video_info_dict['video_time1'] = str(start1_frame) + '_' + str(
start1_frame + truncate1_frame) # Start frame, end frame
video_info_dict['video_time2'] = str(start2_frame) + '_' + str(start2_frame + truncate2_frame)
return concat_audio_spec, concat_video_feat, video_info_dict
def __getitem__(self, idx):
audio_name1 = self.data_list[idx]
spec_npy_path1 = self.spec_list[idx]
video_feat_path1 = self.feat_list[idx]
# select other video:
flag = False
if random.uniform(0, 1) < -1:
flag = True
random_idx = idx
while random_idx == idx:
random_idx = random.randint(0, len(self.data_list) - 1)
audio_name2 = self.data_list[random_idx]
spec_npy_path2 = self.spec_list[random_idx]
video_feat_path2 = self.feat_list[random_idx]
# Load the Spec and Feat:
spec1, video_feat1 = self.load_spec_and_feat(spec_npy_path1, video_feat_path1)
if flag:
spec2, video_feat2 = self.load_spec_and_feat(spec_npy_path2, video_feat_path2)
video_info_dict = {'audio_name1': audio_name1, 'audio_name2': audio_name2}
mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(spec1, spec2, video_feat1, video_feat2, video_info_dict, mode='concat')
else:
video_info_dict = {'audio_name1': audio_name1, 'audio_name2': ""}
mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(spec1=spec1, video_feat1=video_feat1, video_info_dict=video_info_dict, mode='single')
# print("mix spec shape:", mix_spec.shape)
# print("mix video feat:", mix_video_feat.shape)
data_dict = {}
data_dict['mix_spec'] = mix_spec # (80, 512)
data_dict['mix_video_feat'] = mix_video_feat # (32, 512)
data_dict['mix_info_dict'] = mix_info
return data_dict
class audio_video_spec_fullset_Audioset_Train(audio_video_spec_fullset_Audioset_Dataset):
def __init__(self, dataset_cfg):
super().__init__(split='train', **dataset_cfg)
class audio_video_spec_fullset_Audioset_Valid(audio_video_spec_fullset_Audioset_Dataset):
def __init__(self, dataset_cfg):
super().__init__(split='valid', **dataset_cfg)
class audio_video_spec_fullset_Audioset_Test(audio_video_spec_fullset_Audioset_Dataset):
def __init__(self, dataset_cfg):
super().__init__(split='test', **dataset_cfg)