from utils.distributed import is_main_process, get_rank, get_world_size import logging import torch.distributed as dist import torch import io import os import json import re import random import numpy as np from os.path import join from tqdm import trange from PIL import Image from PIL import ImageFile from torchvision.transforms import PILToTensor import librosa import torchaudio # import soundfile as sf ImageFile.LOAD_TRUNCATED_IMAGES = True Image.MAX_IMAGE_PIXELS = None logger = logging.getLogger(__name__) def load_audio_from_path(audio_path, client, sr, audio_reader_type, max_length=0): # print(f"audio_path: {audio_path}, client: {client}, sr: {sr}, audio_reader_type: {audio_reader_type}") if "s3://" in audio_path and client is not None: audio_bytes = client.get(audio_path) buff = io.BytesIO(audio_bytes) else: buff = audio_path if audio_reader_type == 'librosa': audio, _ = librosa.load(buff, sr=sr) audio = torch.from_numpy(audio) # audio = normalize(audio) # normalize waveform to -1,1 due to specified sr in librosa.load # elif audio_reader_type == 'soundfile': # audio, _ = sf.read(buff, sr=sr) # audio = torch.from_numpy(audio) elif audio_reader_type == 'torchaudio': torchaudio.set_audio_backend('soundfile') # for flac files audio, csr = torchaudio.load(buff) if csr != sr: trans = torchaudio.transforms.Resample(csr, sr) audio = trans(audio) if audio.size(0) == 2: audio = torch.mean(audio, dim=0, keepdim=False) else: raise NotImplementedError if max_length != 0: # if audio length is longer than max_length, we randomly crop it to uta length if audio.shape[0] >= max_length: max_start = audio.shape[0] - max_length start = random.randint(0, max_start) audio = audio[start: start + max_length] # padding = torch.zeros(audio.shape).long() else: # padding = torch.cat((torch.zeros(audio.shape), torch.ones(max_length-audio.shape[0])), -1).long() audio = torch.nn.functional.pad(audio, (0, max_length-audio.shape[-1]), 'constant') # print(f"post audio max: {audio.max()}, audio min: {audio.min()}, audio shape: {audio.shape}") if len(audio.shape) == 1: audio = audio.unsqueeze(0) fbank = audio * 2 ** 15 fbank = torchaudio.compliance.kaldi.fbank(fbank, num_mel_bins=64, sample_frequency=16000, frame_length=25, frame_shift=10) fbank_mean = 15.41663 fbank_std = 6.55582 fbank = (fbank - fbank_mean) / (fbank_std * 2) # 998, 64 return fbank def load_image_from_path(image_path, client): if "s3://" in image_path and client is not None: value = client.Get(image_path) if value is None: logger.warning(f"Failed to load {image_path}") img_bytes = np.frombuffer(value, dtype=np.uint8) buff = io.BytesIO(img_bytes) image = Image.open(buff).convert('RGB') else: image = Image.open(image_path).convert('RGB') # PIL Image image = PILToTensor()(image).unsqueeze(0) # (1, C, H, W), torch.uint8 return image def load_anno(ann_file_list): """[summary] Args: ann_file_list (List[List[str, str]] or List[str, str]): the latter will be automatically converted to the former. Each sublist contains [anno_path, image_root], (or [anno_path, video_root, 'video']) which specifies the data type, video or image Returns: List(dict): each dict is { image: str or List[str], # image_path, caption: str or List[str] # caption text string } """ if isinstance(ann_file_list, dict): ann_file_list = [ann_file_list] ann = [] for d in ann_file_list: data_root = d.data_root data_root_prefix = d.get("data_root_prefix", "") fp = d.anno_path cur_ann = json.load(open(fp, "r")) iterator = trange(len(cur_ann), desc=f"Loading {fp}") \ if is_main_process() else range(len(cur_ann)) for idx in iterator: if d.media_type == "image": key = "image" elif d.media_type in ["video", "audio_video"]: key = "video" elif d.media_type == "audio": key = "audio" else: raise NotImplementedError(key) # unified to have the same key for data path if isinstance(cur_ann[idx][key], str): cur_ann[idx]["image"] = data_root_prefix + join(data_root, cur_ann[idx][key]) else: # list cur_ann[idx]["image"] = [data_root_prefix + join(data_root, e) for e in cur_ann[idx][key]] ann += cur_ann return ann def pre_text(text, max_l=None): assert type(text) is str, text text = re.sub(r"([,.'!?\"()*#:;~])", '', text.lower()) text = text.replace('-', ' ').replace('/', ' ').replace('', 'person') text = re.sub(r"\s{2,}", ' ', text) text = text.rstrip('\n').strip(' ') if max_l: # truncate words = text.split(' ') if len(words) > max_l: text = ' '.join(words[:max_l]) return text def collect_result(result, result_dir, filename, is_json=True, is_list=True): if is_json: result_file = os.path.join( result_dir, '%s_rank%d.json' % (filename, get_rank())) final_result_file = os.path.join(result_dir, '%s.json' % filename) json.dump(result, open(result_file, 'w')) else: result_file = os.path.join( result_dir, '%s_rank%d.pth' % (filename, get_rank())) final_result_file = os.path.join(result_dir, '%s.pth' % filename) torch.save(result, result_file) dist.barrier() result = None if is_main_process(): # combine results from all processes if is_list: result = [] else: result = {} for rank in range(get_world_size()): if is_json: result_file = os.path.join( result_dir, '%s_rank%d.json' % (filename, rank)) res = json.load(open(result_file, 'r')) else: result_file = os.path.join( result_dir, '%s_rank%d.pth' % (filename, rank)) res = torch.load(result_file) if is_list: result += res else: result.update(res) return result def sync_save_result(result, result_dir, filename, is_json=True, is_list=True): """gather results from multiple GPUs""" if is_json: result_file = os.path.join( result_dir, "dist_res", '%s_rank%d.json' % (filename, get_rank())) final_result_file = os.path.join(result_dir, '%s.json' % filename) os.makedirs(os.path.dirname(result_file), exist_ok=True) json.dump(result, open(result_file, 'w')) else: result_file = os.path.join( result_dir, "dist_res", '%s_rank%d.pth' % (filename, get_rank())) os.makedirs(os.path.dirname(result_file), exist_ok=True) final_result_file = os.path.join(result_dir, '%s.pth' % filename) torch.save(result, result_file) dist.barrier() if is_main_process(): # combine results from all processes if is_list: result = [] else: result = {} for rank in range(get_world_size()): if is_json: result_file = os.path.join( result_dir, "dist_res", '%s_rank%d.json' % (filename, rank)) res = json.load(open(result_file, 'r')) else: result_file = os.path.join( result_dir, "dist_res", '%s_rank%d.pth' % (filename, rank)) res = torch.load(result_file) if is_list: result += res else: result.update(res) if is_json: json.dump(result, open(final_result_file, 'w')) else: torch.save(result, final_result_file) logger.info('result file saved to %s' % final_result_file) dist.barrier() return final_result_file, result def pad_sequences_1d(sequences, dtype=torch.long, device=torch.device("cpu"), fixed_length=None): """ Pad a single-nested list or a sequence of n-d array (torch.tensor or np.ndarray) into a (n+1)-d array, only allow the first dim has variable lengths. Args: sequences: list(n-d tensor or list) dtype: np.dtype or torch.dtype device: fixed_length: pad all seq in sequences to fixed length. All seq should have a length <= fixed_length. return will be of shape [len(sequences), fixed_length, ...] Returns: padded_seqs: ((n+1)-d tensor) padded with zeros mask: (2d tensor) of the same shape as the first two dims of padded_seqs, 1 indicate valid, 0 otherwise Examples: >>> test_data_list = [[1,2,3], [1,2], [3,4,7,9]] >>> pad_sequences_1d(test_data_list, dtype=torch.long) >>> test_data_3d = [torch.randn(2,3,4), torch.randn(4,3,4), torch.randn(1,3,4)] >>> pad_sequences_1d(test_data_3d, dtype=torch.float) >>> test_data_list = [[1,2,3], [1,2], [3,4,7,9]] >>> pad_sequences_1d(test_data_list, dtype=np.float32) >>> test_data_3d = [np.random.randn(2,3,4), np.random.randn(4,3,4), np.random.randn(1,3,4)] >>> pad_sequences_1d(test_data_3d, dtype=np.float32) """ if isinstance(sequences[0], list): if "torch" in str(dtype): sequences = [torch.tensor(s, dtype=dtype, device=device) for s in sequences] else: sequences = [np.asarray(s, dtype=dtype) for s in sequences] extra_dims = sequences[0].shape[1:] # the extra dims should be the same for all elements lengths = [len(seq) for seq in sequences] if fixed_length is not None: max_length = fixed_length else: max_length = max(lengths) if isinstance(sequences[0], torch.Tensor): assert "torch" in str(dtype), "dtype and input type does not match" padded_seqs = torch.zeros((len(sequences), max_length) + extra_dims, dtype=dtype, device=device) mask = torch.zeros((len(sequences), max_length), dtype=torch.float32, device=device) else: # np assert "numpy" in str(dtype), "dtype and input type does not match" padded_seqs = np.zeros((len(sequences), max_length) + extra_dims, dtype=dtype) mask = np.zeros((len(sequences), max_length), dtype=np.float32) for idx, seq in enumerate(sequences): end = lengths[idx] padded_seqs[idx, :end] = seq mask[idx, :end] = 1 return padded_seqs, mask # , lengths