import csv import os import pickle import sys import numpy as np import torch import random import math import librosa class audio_video_spec_fullset_Dataset(torch.utils.data.Dataset): # Only Load audio dataset: for training Stage1: Audio Npy Dataset def __init__(self, split, dataset1, feat_type='clip', transforms=None, sr=22050, duration=10, truncate=220000, fps=21.5, drop=0.0, fix_frames=False, hop_len=256): super().__init__() if split == "train": self.split = "Train" elif split == "valid" or split == 'test': self.split = "Test" # Default params: self.min_duration = 2 self.sr = sr # 22050 self.duration = duration # 10 self.truncate = truncate # 220000 self.fps = fps self.fix_frames = fix_frames self.hop_len = hop_len self.drop = drop print("Fix Frames: {}".format(self.fix_frames)) print("Use Drop: {}".format(self.drop)) # Dataset1: (VGGSound) assert dataset1.dataset_name == "VGGSound" # spec_dir: spectrogram path # feat_dir: CAVP feature path # video_dir: video path dataset1_spec_dir = os.path.join(dataset1.data_dir, "mel_maa2", "npy") dataset1_feat_dir = os.path.join(dataset1.data_dir, "cavp") dataset1_video_dir = os.path.join(dataset1.video_dir, "tmp_vid") split_txt_path = dataset1.split_txt_path with open(os.path.join(split_txt_path, '{}.txt'.format(self.split)), "r") as f: data_list1 = f.readlines() data_list1 = list(map(lambda x: x.strip(), data_list1)) spec_list1 = list(map(lambda x: os.path.join(dataset1_spec_dir, x) + "_mel.npy", data_list1)) # spec feat_list1 = list(map(lambda x: os.path.join(dataset1_feat_dir, x) + ".npz", data_list1)) # feat video_list1 = list(map(lambda x: os.path.join(dataset1_video_dir, x) + "_new_fps_21.5_truncate_0_10.0.mp4", data_list1)) # video # Merge Data: self.data_list = data_list1 if self.split != "Test" else data_list1[:200] self.spec_list = spec_list1 if self.split != "Test" else spec_list1[:200] self.feat_list = feat_list1 if self.split != "Test" else feat_list1[:200] self.video_list = video_list1 if self.split != "Test" else video_list1[:200] assert len(self.data_list) == len(self.spec_list) == len(self.feat_list) == len(self.video_list) shuffle_idx = np.random.permutation(np.arange(len(self.data_list))) self.data_list = [self.data_list[i] for i in shuffle_idx] self.spec_list = [self.spec_list[i] for i in shuffle_idx] self.feat_list = [self.feat_list[i] for i in shuffle_idx] self.video_list = [self.video_list[i] for i in shuffle_idx] print('Split: {} Sample Num: {}'.format(split, len(self.data_list))) def __len__(self): return len(self.data_list) def load_spec_and_feat(self, spec_path, video_feat_path): """Load audio spec and video feat""" try: spec_raw = np.load(spec_path).astype(np.float32) # channel: 1 except: print(f"corrupted mel: {spec_path}", flush=True) spec_raw = np.zeros((80, 625), dtype=np.float32) # [C, T] p = np.random.uniform(0,1) if p > self.drop: try: video_feat = np.load(video_feat_path)['feat'].astype(np.float32) except: print(f"corrupted video: {video_feat_path}", flush=True) video_feat = np.load(os.path.join(os.path.dirname(video_feat_path), 'empty_vid.npz'))['feat'].astype(np.float32) else: video_feat = np.load(os.path.join(os.path.dirname(video_feat_path), 'empty_vid.npz'))['feat'].astype(np.float32) spec_len = self.sr * self.duration / self.hop_len if spec_raw.shape[1] < spec_len: spec_raw = np.tile(spec_raw, math.ceil(spec_len / spec_raw.shape[1])) spec_raw = spec_raw[:, :int(spec_len)] feat_len = self.fps * self.duration if video_feat.shape[0] < feat_len: video_feat = np.tile(video_feat, (math.ceil(feat_len / video_feat.shape[0]), 1)) video_feat = video_feat[:int(feat_len)] return spec_raw, video_feat def mix_audio_and_feat(self, spec1=None, spec2=None, video_feat1=None, video_feat2=None, video_info_dict={}, mode='single'): """ Return Mix Spec and Mix video feat""" if mode == "single": # spec1: if not self.fix_frames: start_idx = random.randint(0, self.sr * self.duration - self.truncate - 1) # audio start else: start_idx = 0 start_frame = int(self.fps * start_idx / self.sr) truncate_frame = int(self.fps * self.truncate / self.sr) # Spec Start & Truncate: spec_start = int(start_idx / self.hop_len) spec_truncate = int(self.truncate / self.hop_len) spec1 = spec1[:, spec_start : spec_start + spec_truncate] video_feat1 = video_feat1[start_frame: start_frame + truncate_frame] # info_dict: video_info_dict['video_time1'] = str(start_frame) + '_' + str(start_frame+truncate_frame) # Start frame, end frame video_info_dict['video_time2'] = "" return spec1, video_feat1, video_info_dict elif mode == "concat": total_spec_len = int(self.truncate / self.hop_len) # Random Trucate len: spec1_truncate_len = random.randint(self.min_duration * self.sr // self.hop_len, total_spec_len - self.min_duration * self.sr // self.hop_len - 1) spec2_truncate_len = total_spec_len - spec1_truncate_len # Sample spec clip: spec_start1 = random.randint(0, total_spec_len - spec1_truncate_len - 1) spec_start2 = random.randint(0, total_spec_len - spec2_truncate_len - 1) spec_end1, spec_end2 = spec_start1 + spec1_truncate_len, spec_start2 + spec2_truncate_len # concat spec: spec1, spec2 = spec1[:, spec_start1 : spec_end1], spec2[:, spec_start2 : spec_end2] concat_audio_spec = np.concatenate([spec1, spec2], axis=1) # Concat Video Feat: start1_frame, truncate1_frame = int(self.fps * spec_start1 * self.hop_len / self.sr), int(self.fps * spec1_truncate_len * self.hop_len / self.sr) start2_frame, truncate2_frame = int(self.fps * spec_start2 * self.hop_len / self.sr), int(self.fps * self.truncate / self.sr) - truncate1_frame video_feat1, video_feat2 = video_feat1[start1_frame : start1_frame + truncate1_frame], video_feat2[start2_frame : start2_frame + truncate2_frame] concat_video_feat = np.concatenate([video_feat1, video_feat2]) video_info_dict['video_time1'] = str(start1_frame) + '_' + str(start1_frame+truncate1_frame) # Start frame, end frame video_info_dict['video_time2'] = str(start2_frame) + '_' + str(start2_frame+truncate2_frame) return concat_audio_spec, concat_video_feat, video_info_dict def __getitem__(self, idx): audio_name1 = self.data_list[idx] spec_npy_path1 = self.spec_list[idx] video_feat_path1 = self.feat_list[idx] video_path1 = self.video_list[idx] # select other video: flag = False if random.uniform(0, 1) < 0.5: flag = True random_idx = idx while random_idx == idx: random_idx = random.randint(0, len(self.data_list)-1) audio_name2 = self.data_list[random_idx] spec_npy_path2 = self.spec_list[random_idx] video_feat_path2 = self.feat_list[random_idx] video_path2 = self.video_list[random_idx] # Load the Spec and Feat: spec1, video_feat1 = self.load_spec_and_feat(spec_npy_path1, video_feat_path1) if flag: spec2, video_feat2 = self.load_spec_and_feat(spec_npy_path2, video_feat_path2) video_info_dict = {'audio_name1':audio_name1, 'audio_name2': audio_name2, 'video_path1': video_path1, 'video_path2': video_path2} mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(spec1, spec2, video_feat1, video_feat2, video_info_dict, mode='concat') else: video_info_dict = {'audio_name1':audio_name1, 'audio_name2': "", 'video_path1': video_path1, 'video_path2': ""} mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(spec1=spec1, video_feat1=video_feat1, video_info_dict=video_info_dict, mode='single') # print("mix spec shape:", mix_spec.shape) # print("mix video feat:", mix_video_feat.shape) data_dict = {} # data_dict['mix_spec'] = mix_spec[None].repeat(3, axis=0) # TODO:要把这里改掉,否则无法适应maa的autoencoder data_dict['mix_spec'] = mix_spec # (80, 512) data_dict['mix_video_feat'] = mix_video_feat # (32, 512) data_dict['mix_info_dict'] = mix_info return data_dict class audio_video_spec_fullset_Dataset_Train(audio_video_spec_fullset_Dataset): def __init__(self, dataset_cfg): super().__init__(split='train', **dataset_cfg) class audio_video_spec_fullset_Dataset_Valid(audio_video_spec_fullset_Dataset): def __init__(self, dataset_cfg): super().__init__(split='valid', **dataset_cfg) class audio_video_spec_fullset_Dataset_Test(audio_video_spec_fullset_Dataset): def __init__(self, dataset_cfg): super().__init__(split='test', **dataset_cfg) class audio_video_spec_fullset_Dataset_inpaint(audio_video_spec_fullset_Dataset): def __getitem__(self, idx): audio_name1 = self.data_list[idx] spec_npy_path1 = self.spec_list[idx] video_feat_path1 = self.feat_list[idx] video_path1 = self.video_list[idx] # Load the Spec and Feat: spec1, video_feat1 = self.load_spec_and_feat(spec_npy_path1, video_feat_path1) video_info_dict = {'audio_name1': audio_name1, 'audio_name2': "", 'video_path1': video_path1, 'video_path2': ""} mix_spec, mix_masked_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(spec1=spec1, video_feat1=video_feat1, video_info_dict=video_info_dict) # print("mix spec shape:", mix_spec.shape) # print("mix video feat:", mix_video_feat.shape) data_dict = {} # data_dict['mix_spec'] = mix_spec[None].repeat(3, axis=0) # TODO:要把这里改掉,否则无法适应maa的autoencoder data_dict['mix_spec'] = mix_spec # (80, 512) data_dict['hybrid_feat'] = {'mix_video_feat': mix_video_feat, 'mix_spec': mix_masked_spec} # (32, 512) data_dict['mix_info_dict'] = mix_info return data_dict def mix_audio_and_feat(self, spec1=None, video_feat1=None, video_info_dict={}): """ Return Mix Spec and Mix video feat""" # spec1: if not self.fix_frames: start_idx = random.randint(0, self.sr * self.duration - self.truncate - 1) # audio start else: start_idx = 0 start_frame = int(self.fps * start_idx / self.sr) truncate_frame = int(self.fps * self.truncate / self.sr) # Spec Start & Truncate: spec_start = int(start_idx / self.hop_len) spec_truncate = int(self.truncate / self.hop_len) spec1 = spec1[:, spec_start: spec_start + spec_truncate] video_feat1 = video_feat1[start_frame: start_frame + truncate_frame] # Start masking frames: masked_spec = random.randint(1, int(spec_truncate * 0.5 // 16)) * 16 # 16帧的倍数,最多mask 50% masked_truncate = int(masked_spec * self.hop_len) masked_frame = int(self.fps * masked_truncate / self.sr) start_masked_idx = random.randint(0, self.truncate - masked_truncate - 1) start_masked_frame = int(self.fps * start_masked_idx / self.sr) start_masked_spec = int(start_masked_idx / self.hop_len) masked_spec1 = np.zeros((80, spec_truncate)).astype(np.float32) masked_spec1[:] = spec1[:] masked_spec1[:, start_masked_spec:start_masked_spec+masked_spec] = np.zeros((80, masked_spec)) video_feat1[start_masked_frame:start_masked_frame+masked_frame, :] = np.zeros((masked_frame, 512)) # info_dict: video_info_dict['video_time1'] = str(start_frame) + '_' + str(start_frame + truncate_frame) # Start frame, end frame video_info_dict['video_time2'] = "" return spec1, masked_spec1, video_feat1, video_info_dict class audio_video_spec_fullset_Dataset_inpaint_Train(audio_video_spec_fullset_Dataset_inpaint): def __init__(self, dataset_cfg): super().__init__(split='train', **dataset_cfg) class audio_video_spec_fullset_Dataset_inpaint_Valid(audio_video_spec_fullset_Dataset_inpaint): def __init__(self, dataset_cfg): super().__init__(split='valid', **dataset_cfg) class audio_video_spec_fullset_Dataset_inpaint_Test(audio_video_spec_fullset_Dataset_inpaint): def __init__(self, dataset_cfg): super().__init__(split='test', **dataset_cfg) class audio_Dataset(torch.utils.data.Dataset): # Only Load audio dataset: for training Stage1: Audio Npy Dataset def __init__(self, split, dataset1, sr=22050, duration=10, truncate=220000, debug_num=False, fix_frames=False, hop_len=256): super().__init__() if split == "train": self.split = "Train" elif split == "valid" or split == 'test': self.split = "Test" # Default params: self.min_duration = 2 self.sr = sr # 22050 self.duration = duration # 10 self.truncate = truncate # 220000 self.fix_frames = fix_frames self.hop_len = hop_len print("Fix Frames: {}".format(self.fix_frames)) # Dataset1: (VGGSound) assert dataset1.dataset_name == "VGGSound" # spec_dir: spectrogram path # dataset1_spec_dir = os.path.join(dataset1.data_dir, "codec") dataset1_wav_dir = os.path.join(dataset1.wav_dir, "wav") split_txt_path = dataset1.split_txt_path with open(os.path.join(split_txt_path, '{}.txt'.format(self.split)), "r") as f: data_list1 = f.readlines() data_list1 = list(map(lambda x: x.strip(), data_list1)) wav_list1 = list(map(lambda x: os.path.join(dataset1_wav_dir, x) + ".wav", data_list1)) # feat # Merge Data: self.data_list = data_list1 self.wav_list = wav_list1 assert len(self.data_list) == len(self.wav_list) shuffle_idx = np.random.permutation(np.arange(len(self.data_list))) self.data_list = [self.data_list[i] for i in shuffle_idx] self.wav_list = [self.wav_list[i] for i in shuffle_idx] if debug_num: self.data_list = self.data_list[:debug_num] self.wav_list = self.wav_list[:debug_num] print('Split: {} Sample Num: {}'.format(split, len(self.data_list))) def __len__(self): return len(self.data_list) def load_spec_and_feat(self, wav_path): """Load audio spec and video feat""" try: wav_raw, sr = librosa.load(wav_path, sr=self.sr) # channel: 1 except: print(f"corrupted wav: {wav_path}", flush=True) wav_raw = np.zeros((160000,), dtype=np.float32) # [T] wav_len = self.sr * self.duration if wav_raw.shape[0] < wav_len: wav_raw = np.tile(wav_raw, math.ceil(wav_len / wav_raw.shape[0])) wav_raw = wav_raw[:int(wav_len)] return wav_raw def mix_audio_and_feat(self, wav_raw1=None, video_info_dict={}, mode='single'): """ Return Mix Spec and Mix video feat""" if mode == "single": # spec1: if not self.fix_frames: start_idx = random.randint(0, self.sr * self.duration - self.truncate - 1) # audio start else: start_idx = 0 wav_start = start_idx wav_truncate = self.truncate wav_raw1 = wav_raw1[wav_start: wav_start + wav_truncate] return wav_raw1, video_info_dict elif mode == "concat": total_spec_len = int(self.truncate / self.hop_len) # Random Trucate len: spec1_truncate_len = random.randint(self.min_duration * self.sr // self.hop_len, total_spec_len - self.min_duration * self.sr // self.hop_len - 1) spec2_truncate_len = total_spec_len - spec1_truncate_len # Sample spec clip: spec_start1 = random.randint(0, total_spec_len - spec1_truncate_len - 1) spec_start2 = random.randint(0, total_spec_len - spec2_truncate_len - 1) spec_end1, spec_end2 = spec_start1 + spec1_truncate_len, spec_start2 + spec2_truncate_len # concat spec: return video_info_dict def __getitem__(self, idx): audio_name1 = self.data_list[idx] wav_path1 = self.wav_list[idx] # select other video: flag = False if random.uniform(0, 1) < -1: flag = True random_idx = idx while random_idx == idx: random_idx = random.randint(0, len(self.data_list)-1) audio_name2 = self.data_list[random_idx] spec_npy_path2 = self.spec_list[random_idx] wav_path2 = self.wav_list[random_idx] # Load the Spec and Feat: wav_raw1 = self.load_spec_and_feat(wav_path1) if flag: spec2, video_feat2 = self.load_spec_and_feat(spec_npy_path2, wav_path2) video_info_dict = {'audio_name1':audio_name1, 'audio_name2': audio_name2} mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(video_info_dict, mode='concat') else: video_info_dict = {'audio_name1':audio_name1, 'audio_name2': ""} mix_wav, mix_info = self.mix_audio_and_feat(wav_raw1=wav_raw1, video_info_dict=video_info_dict, mode='single') data_dict = {} data_dict['mix_wav'] = mix_wav # (131072,) data_dict['mix_info_dict'] = mix_info return data_dict class audio_Dataset_Train(audio_Dataset): def __init__(self, dataset_cfg): super().__init__(split='train', **dataset_cfg) class audio_Dataset_Test(audio_Dataset): def __init__(self, dataset_cfg): super().__init__(split='test', **dataset_cfg) class audio_Dataset_Valid(audio_Dataset): def __init__(self, dataset_cfg): super().__init__(split='valid', **dataset_cfg) class video_codec_Dataset(torch.utils.data.Dataset): # Only Load audio dataset: for training Stage1: Audio Npy Dataset def __init__(self, split, dataset1, sr=22050, duration=10, truncate=220000, fps=21.5, debug_num=False, fix_frames=False, hop_len=256): super().__init__() if split == "train": self.split = "Train" elif split == "valid" or split == 'test': self.split = "Test" # Default params: self.min_duration = 2 self.fps = fps self.sr = sr # 22050 self.duration = duration # 10 self.truncate = truncate # 220000 self.fix_frames = fix_frames self.hop_len = hop_len print("Fix Frames: {}".format(self.fix_frames)) # Dataset1: (VGGSound) assert dataset1.dataset_name == "VGGSound" # spec_dir: spectrogram path # dataset1_spec_dir = os.path.join(dataset1.data_dir, "codec") dataset1_feat_dir = os.path.join(dataset1.data_dir, "cavp") dataset1_wav_dir = os.path.join(dataset1.wav_dir, "wav") split_txt_path = dataset1.split_txt_path with open(os.path.join(split_txt_path, '{}.txt'.format(self.split)), "r") as f: data_list1 = f.readlines() data_list1 = list(map(lambda x: x.strip(), data_list1)) wav_list1 = list(map(lambda x: os.path.join(dataset1_wav_dir, x) + ".wav", data_list1)) # feat feat_list1 = list(map(lambda x: os.path.join(dataset1_feat_dir, x) + ".npz", data_list1)) # feat # Merge Data: self.data_list = data_list1 self.wav_list = wav_list1 self.feat_list = feat_list1 assert len(self.data_list) == len(self.wav_list) shuffle_idx = np.random.permutation(np.arange(len(self.data_list))) self.data_list = [self.data_list[i] for i in shuffle_idx] self.wav_list = [self.wav_list[i] for i in shuffle_idx] self.feat_list = [self.feat_list[i] for i in shuffle_idx] if debug_num: self.data_list = self.data_list[:debug_num] self.wav_list = self.wav_list[:debug_num] self.feat_list = self.feat_list[:debug_num] print('Split: {} Sample Num: {}'.format(split, len(self.data_list))) def __len__(self): return len(self.data_list) def load_spec_and_feat(self, wav_path, video_feat_path): """Load audio spec and video feat""" try: wav_raw, sr = librosa.load(wav_path, sr=self.sr) # channel: 1 except: print(f"corrupted wav: {wav_path}", flush=True) wav_raw = np.zeros((160000,), dtype=np.float32) # [T] try: video_feat = np.load(video_feat_path)['feat'].astype(np.float32) except: print(f"corrupted video: {video_feat_path}", flush=True) video_feat = np.load(os.path.join(os.path.dirname(video_feat_path), 'empty_vid.npz'))['feat'].astype(np.float32) wav_len = self.sr * self.duration if wav_raw.shape[0] < wav_len: wav_raw = np.tile(wav_raw, math.ceil(wav_len / wav_raw.shape[0])) wav_raw = wav_raw[:int(wav_len)] feat_len = self.fps * self.duration if video_feat.shape[0] < feat_len: video_feat = np.tile(video_feat, (math.ceil(feat_len / video_feat.shape[0]), 1)) video_feat = video_feat[:int(feat_len)] return wav_raw, video_feat def mix_audio_and_feat(self, wav_raw1=None, video_feat1=None, video_info_dict={}, mode='single'): """ Return Mix Spec and Mix video feat""" if mode == "single": # spec1: if not self.fix_frames: start_idx = random.randint(0, self.sr * self.duration - self.truncate - 1) # audio start else: start_idx = 0 wav_start = start_idx wav_truncate = self.truncate wav_raw1 = wav_raw1[wav_start: wav_start + wav_truncate] start_frame = int(self.fps * start_idx / self.sr) truncate_frame = int(self.fps * self.truncate / self.sr) video_feat1 = video_feat1[start_frame: start_frame + truncate_frame] # info_dict: video_info_dict['video_time1'] = str(start_frame) + '_' + str(start_frame+truncate_frame) # Start frame, end frame video_info_dict['video_time2'] = "" return wav_raw1, video_feat1, video_info_dict elif mode == "concat": total_spec_len = int(self.truncate / self.hop_len) # Random Trucate len: spec1_truncate_len = random.randint(self.min_duration * self.sr // self.hop_len, total_spec_len - self.min_duration * self.sr // self.hop_len - 1) spec2_truncate_len = total_spec_len - spec1_truncate_len # Sample spec clip: spec_start1 = random.randint(0, total_spec_len - spec1_truncate_len - 1) spec_start2 = random.randint(0, total_spec_len - spec2_truncate_len - 1) spec_end1, spec_end2 = spec_start1 + spec1_truncate_len, spec_start2 + spec2_truncate_len # concat spec: return video_info_dict def __getitem__(self, idx): audio_name1 = self.data_list[idx] wav_path1 = self.wav_list[idx] video_feat_path1 = self.feat_list[idx] # select other video: flag = False if random.uniform(0, 1) < -1: flag = True random_idx = idx while random_idx == idx: random_idx = random.randint(0, len(self.data_list)-1) audio_name2 = self.data_list[random_idx] wav_path2 = self.wav_list[random_idx] video_feat_path2 = self.feat_list[random_idx] # Load the Spec and Feat: wav_raw1, video_feat1 = self.load_spec_and_feat(wav_path1, video_feat_path1) if flag: wav_raw2, video_feat2 = self.load_spec_and_feat(wav_path2, video_feat_path2) video_info_dict = {'audio_name1':audio_name1, 'audio_name2': audio_name2} mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(video_info_dict, mode='concat') else: video_info_dict = {'audio_name1':audio_name1, 'audio_name2': ""} mix_wav, mix_video_feat, mix_info = self.mix_audio_and_feat(wav_raw1=wav_raw1, video_feat1=video_feat1, video_info_dict=video_info_dict, mode='single') data_dict = {} data_dict['mix_wav'] = mix_wav # (131072,) data_dict['mix_video_feat'] = mix_video_feat # (32, 512) data_dict['mix_info_dict'] = mix_info return data_dict class video_codec_Dataset_Train(video_codec_Dataset): def __init__(self, dataset_cfg): super().__init__(split='train', **dataset_cfg) class video_codec_Dataset_Test(video_codec_Dataset): def __init__(self, dataset_cfg): super().__init__(split='test', **dataset_cfg) class video_codec_Dataset_Valid(video_codec_Dataset): def __init__(self, dataset_cfg): super().__init__(split='valid', **dataset_cfg) class audio_video_spec_fullset_Audioset_Dataset(torch.utils.data.Dataset): # Only Load audio dataset: for training Stage1: Audio Npy Dataset def __init__(self, split, dataset1, dataset2, sr=22050, duration=10, truncate=220000, fps=21.5, drop=0.0, fix_frames=False, hop_len=256): super().__init__() if split == "train": self.split = "Train" elif split == "valid" or split == 'test': self.split = "Test" # Default params: self.min_duration = 2 self.sr = sr # 22050 self.duration = duration # 10 self.truncate = truncate # 220000 self.fps = fps self.fix_frames = fix_frames self.hop_len = hop_len self.drop = drop print("Fix Frames: {}".format(self.fix_frames)) print("Use Drop: {}".format(self.drop)) # Dataset1: (VGGSound) assert dataset1.dataset_name == "VGGSound" assert dataset2.dataset_name == "Audioset" # spec_dir: spectrogram path # feat_dir: CAVP feature path # video_dir: video path dataset1_spec_dir = os.path.join(dataset1.data_dir, "mel_maa2", "npy") dataset1_feat_dir = os.path.join(dataset1.data_dir, "cavp") split_txt_path = dataset1.split_txt_path with open(os.path.join(split_txt_path, '{}.txt'.format(self.split)), "r") as f: data_list1 = f.readlines() data_list1 = list(map(lambda x: x.strip(), data_list1)) spec_list1 = list(map(lambda x: os.path.join(dataset1_spec_dir, x) + "_mel.npy", data_list1)) # spec feat_list1 = list(map(lambda x: os.path.join(dataset1_feat_dir, x) + ".npz", data_list1)) # feat if split == "train": dataset2_spec_dir = os.path.join(dataset2.data_dir, "mel") dataset2_feat_dir = os.path.join(dataset2.data_dir, "cavp_renamed") split_txt_path = dataset2.split_txt_path with open(os.path.join(split_txt_path, '{}.txt'.format(self.split)), "r") as f: data_list2 = f.readlines() data_list2 = list(map(lambda x: x.strip(), data_list2)) spec_list2 = list(map(lambda x: os.path.join(dataset2_spec_dir, f'Y{x}') + "_mel.npy", data_list2)) # spec feat_list2 = list(map(lambda x: os.path.join(dataset2_feat_dir, x) + ".npz", data_list2)) # feat data_list1 += data_list2 spec_list1 += spec_list2 feat_list1 += feat_list2 # Merge Data: self.data_list = data_list1 if self.split != "Test" else data_list1[:200] self.spec_list = spec_list1 if self.split != "Test" else spec_list1[:200] self.feat_list = feat_list1 if self.split != "Test" else feat_list1[:200] assert len(self.data_list) == len(self.spec_list) == len(self.feat_list) shuffle_idx = np.random.permutation(np.arange(len(self.data_list))) self.data_list = [self.data_list[i] for i in shuffle_idx] self.spec_list = [self.spec_list[i] for i in shuffle_idx] self.feat_list = [self.feat_list[i] for i in shuffle_idx] print('Split: {} Sample Num: {}'.format(split, len(self.data_list))) # self.check(self.spec_list) def __len__(self): return len(self.data_list) def check(self, feat_list): from tqdm import tqdm for spec_path in tqdm(feat_list): mel = np.load(spec_path).astype(np.float32) if mel.shape[0] != 80: import ipdb ipdb.set_trace() def load_spec_and_feat(self, spec_path, video_feat_path): """Load audio spec and video feat""" spec_raw = np.load(spec_path).astype(np.float32) # channel: 1 if spec_raw.shape[0] != 80: print(f"corrupted mel: {spec_path}", flush=True) spec_raw = np.zeros((80, 625), dtype=np.float32) # [C, T] p = np.random.uniform(0, 1) if p > self.drop: try: video_feat = np.load(video_feat_path)['feat'].astype(np.float32) except: print(f"corrupted video: {video_feat_path}", flush=True) video_feat = np.load(os.path.join(os.path.dirname(video_feat_path), 'empty_vid.npz'))['feat'].astype(np.float32) else: video_feat = np.load(os.path.join(os.path.dirname(video_feat_path), 'empty_vid.npz'))['feat'].astype(np.float32) spec_len = self.sr * self.duration / self.hop_len if spec_raw.shape[1] < spec_len: spec_raw = np.tile(spec_raw, math.ceil(spec_len / spec_raw.shape[1])) spec_raw = spec_raw[:, :int(spec_len)] feat_len = self.fps * self.duration if video_feat.shape[0] < feat_len: video_feat = np.tile(video_feat, (math.ceil(feat_len / video_feat.shape[0]), 1)) video_feat = video_feat[:int(feat_len)] return spec_raw, video_feat def mix_audio_and_feat(self, spec1=None, spec2=None, video_feat1=None, video_feat2=None, video_info_dict={}, mode='single'): """ Return Mix Spec and Mix video feat""" if mode == "single": # spec1: if not self.fix_frames: start_idx = random.randint(0, self.sr * self.duration - self.truncate - 1) # audio start else: start_idx = 0 start_frame = int(self.fps * start_idx / self.sr) truncate_frame = int(self.fps * self.truncate / self.sr) # Spec Start & Truncate: spec_start = int(start_idx / self.hop_len) spec_truncate = int(self.truncate / self.hop_len) spec1 = spec1[:, spec_start: spec_start + spec_truncate] video_feat1 = video_feat1[start_frame: start_frame + truncate_frame] # info_dict: video_info_dict['video_time1'] = str(start_frame) + '_' + str( start_frame + truncate_frame) # Start frame, end frame video_info_dict['video_time2'] = "" return spec1, video_feat1, video_info_dict elif mode == "concat": total_spec_len = int(self.truncate / self.hop_len) # Random Trucate len: spec1_truncate_len = random.randint(self.min_duration * self.sr // self.hop_len, total_spec_len - self.min_duration * self.sr // self.hop_len - 1) spec2_truncate_len = total_spec_len - spec1_truncate_len # Sample spec clip: spec_start1 = random.randint(0, total_spec_len - spec1_truncate_len - 1) spec_start2 = random.randint(0, total_spec_len - spec2_truncate_len - 1) spec_end1, spec_end2 = spec_start1 + spec1_truncate_len, spec_start2 + spec2_truncate_len # concat spec: spec1, spec2 = spec1[:, spec_start1: spec_end1], spec2[:, spec_start2: spec_end2] concat_audio_spec = np.concatenate([spec1, spec2], axis=1) # Concat Video Feat: start1_frame, truncate1_frame = int(self.fps * spec_start1 * self.hop_len / self.sr), int( self.fps * spec1_truncate_len * self.hop_len / self.sr) start2_frame, truncate2_frame = int(self.fps * spec_start2 * self.hop_len / self.sr), int( self.fps * self.truncate / self.sr) - truncate1_frame video_feat1, video_feat2 = video_feat1[start1_frame: start1_frame + truncate1_frame], video_feat2[ start2_frame: start2_frame + truncate2_frame] concat_video_feat = np.concatenate([video_feat1, video_feat2]) video_info_dict['video_time1'] = str(start1_frame) + '_' + str( start1_frame + truncate1_frame) # Start frame, end frame video_info_dict['video_time2'] = str(start2_frame) + '_' + str(start2_frame + truncate2_frame) return concat_audio_spec, concat_video_feat, video_info_dict def __getitem__(self, idx): audio_name1 = self.data_list[idx] spec_npy_path1 = self.spec_list[idx] video_feat_path1 = self.feat_list[idx] # select other video: flag = False if random.uniform(0, 1) < -1: flag = True random_idx = idx while random_idx == idx: random_idx = random.randint(0, len(self.data_list) - 1) audio_name2 = self.data_list[random_idx] spec_npy_path2 = self.spec_list[random_idx] video_feat_path2 = self.feat_list[random_idx] # Load the Spec and Feat: spec1, video_feat1 = self.load_spec_and_feat(spec_npy_path1, video_feat_path1) if flag: spec2, video_feat2 = self.load_spec_and_feat(spec_npy_path2, video_feat_path2) video_info_dict = {'audio_name1': audio_name1, 'audio_name2': audio_name2} mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(spec1, spec2, video_feat1, video_feat2, video_info_dict, mode='concat') else: video_info_dict = {'audio_name1': audio_name1, 'audio_name2': ""} mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(spec1=spec1, video_feat1=video_feat1, video_info_dict=video_info_dict, mode='single') # print("mix spec shape:", mix_spec.shape) # print("mix video feat:", mix_video_feat.shape) data_dict = {} data_dict['mix_spec'] = mix_spec # (80, 512) data_dict['mix_video_feat'] = mix_video_feat # (32, 512) data_dict['mix_info_dict'] = mix_info return data_dict class audio_video_spec_fullset_Audioset_Train(audio_video_spec_fullset_Audioset_Dataset): def __init__(self, dataset_cfg): super().__init__(split='train', **dataset_cfg) class audio_video_spec_fullset_Audioset_Valid(audio_video_spec_fullset_Audioset_Dataset): def __init__(self, dataset_cfg): super().__init__(split='valid', **dataset_cfg) class audio_video_spec_fullset_Audioset_Test(audio_video_spec_fullset_Audioset_Dataset): def __init__(self, dataset_cfg): super().__init__(split='test', **dataset_cfg)