cyun9286's picture
Add application file
f53b39e
raw
history blame
7.41 kB
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
import logging
import math
from typing import Callable, Iterable, List, Optional, Sequence
import torch
from torch.utils.data import BatchSampler, DataLoader, Dataset, IterableDataset, Subset
from torch.utils.data.distributed import DistributedSampler
class MixedDataLoader:
def __init__(self, dataloaders: List[DataLoader], mixing_prob: torch.FloatTensor):
"""
Args:
dataloaders (List[DataLoader]): List of DataLoaders to be mixed.
mixing_prob (torch.FloatTensor): Probability of each dataloader to be sampled from
"""
assert len(dataloaders) == mixing_prob.shape[0]
self.dataloaders = dataloaders
self.mixing_prob = mixing_prob
# Iterator state
self._iter_dls = None
self._iter_mixing_prob = None
self.random_generator = torch.Generator()
def __len__(self):
return sum([len(d) for d in self.dataloaders])
def __iter__(self):
# Synchronize dataloader seeds
self.random_generator.manual_seed(42)
self._iter_dls = [iter(loader) for loader in self.dataloaders]
self._iter_mixing_prob = self.mixing_prob.clone()
return self
def __next__(self):
"""
Sample a dataloader to sample from based on mixing probabilities. If one of the dataloaders is exhausted, we continue sampling from the other loaders until all are exhausted.
"""
if self._iter_dls is None:
raise TypeError(f"{type(self).__name__} object is not an iterator")
while self._iter_mixing_prob.any(): # at least one D-Loader with non-zero prob.
dataset_idx = self._iter_mixing_prob.multinomial(
1, generator=self.random_generator
).item()
try:
item = next(self._iter_dls[dataset_idx])
return item
except StopIteration:
# No more iterations for this dataset, set it's mixing probability to zero and try again.
self._iter_mixing_prob[dataset_idx] = 0
except Exception as e:
# log and raise any other unexpected error.
logging.error(e)
raise e
# Exhausted all iterators
raise StopIteration
class TorchTrainMixedDataset:
def __init__(
self,
datasets: List[Dataset],
batch_sizes: List[int],
num_workers: int,
shuffle: bool,
pin_memory: bool,
drop_last: bool,
collate_fn: Optional[Callable] = None,
worker_init_fn: Optional[Callable] = None,
phases_per_epoch: int = 1,
dataset_prob: Optional[List[float]] = None,
) -> None:
"""
Args:
datasets (List[Dataset]): List of Datasets to be mixed.
batch_sizes (List[int]): Batch sizes for each dataset in the list.
num_workers (int): Number of workers per dataloader.
shuffle (bool): Whether or not to shuffle data.
pin_memory (bool): If True, use pinned memory when loading tensors from disk.
drop_last (bool): Whether or not to drop the last batch of data.
collate_fn (Callable): Function to merge a list of samples into a mini-batch.
worker_init_fn (Callable): Function to init each dataloader worker.
phases_per_epoch (int): Number of phases per epoch.
dataset_prob (List[float]): Probability of choosing the dataloader to sample from. Should sum to 1.0
"""
self.datasets = datasets
self.batch_sizes = batch_sizes
self.num_workers = num_workers
self.shuffle = shuffle
self.pin_memory = pin_memory
self.drop_last = drop_last
self.collate_fn = collate_fn
self.worker_init_fn = worker_init_fn
assert len(self.datasets) > 0
for dataset in self.datasets:
assert not isinstance(dataset, IterableDataset), "Not supported"
# `RepeatFactorWrapper` requires calling set_epoch first to get its length
self._set_dataset_epoch(dataset, 0)
self.phases_per_epoch = phases_per_epoch
self.chunks = [None] * len(datasets)
if dataset_prob is None:
# If not provided, assign each dataset a probability proportional to its length.
dataset_lens = [
(math.floor(len(d) / bs) if drop_last else math.ceil(len(d) / bs))
for d, bs in zip(datasets, batch_sizes)
]
total_len = sum(dataset_lens)
dataset_prob = torch.tensor([d_len / total_len for d_len in dataset_lens])
else:
assert len(dataset_prob) == len(datasets)
dataset_prob = torch.tensor(dataset_prob)
logging.info(f"Dataset mixing probabilities: {dataset_prob.tolist()}")
assert dataset_prob.sum().item() == 1.0, "Probabilities should sum to 1.0"
self.dataset_prob = dataset_prob
def _set_dataset_epoch(self, dataset, epoch: int) -> None:
if hasattr(dataset, "epoch"):
dataset.epoch = epoch
if hasattr(dataset, "set_epoch"):
dataset.set_epoch(epoch)
def get_loader(self, epoch) -> Iterable:
dataloaders = []
for d_idx, (dataset, batch_size) in enumerate(
zip(self.datasets, self.batch_sizes)
):
if self.phases_per_epoch > 1:
# Major epoch that looops over entire dataset
# len(main_epoch) == phases_per_epoch * len(epoch)
main_epoch = epoch // self.phases_per_epoch
# Phase with in the main epoch
local_phase = epoch % self.phases_per_epoch
# Start of new data-epoch or job is resumed after preemtion.
if local_phase == 0 or self.chunks[d_idx] is None:
# set seed for dataset epoch
# If using RepeatFactorWrapper, this step currectly re-samples indices before chunking.
self._set_dataset_epoch(dataset, main_epoch)
# Separate random generator for subset sampling
g = torch.Generator()
g.manual_seed(main_epoch)
self.chunks[d_idx] = torch.chunk(
torch.randperm(len(dataset), generator=g),
self.phases_per_epoch,
)
dataset = Subset(dataset, self.chunks[d_idx][local_phase])
else:
self._set_dataset_epoch(dataset, epoch)
sampler = DistributedSampler(dataset, shuffle=self.shuffle)
sampler.set_epoch(epoch)
batch_sampler = BatchSampler(sampler, batch_size, drop_last=self.drop_last)
dataloaders.append(
DataLoader(
dataset,
num_workers=self.num_workers,
pin_memory=self.pin_memory,
batch_sampler=batch_sampler,
collate_fn=self.collate_fn,
worker_init_fn=self.worker_init_fn,
)
)
return MixedDataLoader(dataloaders, self.dataset_prob)