|
from typing import Iterator |
|
from typing import List |
|
from typing import Sequence |
|
from typing import Tuple |
|
from typing import Union |
|
|
|
from typeguard import check_argument_types |
|
|
|
from espnet2.fileio.read_text import load_num_sequence_text |
|
from espnet2.fileio.read_text import read_2column_text |
|
from espnet2.samplers.abs_sampler import AbsSampler |
|
|
|
|
|
class FoldedBatchSampler(AbsSampler): |
|
def __init__( |
|
self, |
|
batch_size: int, |
|
shape_files: Union[Tuple[str, ...], List[str]], |
|
fold_lengths: Sequence[int], |
|
min_batch_size: int = 1, |
|
sort_in_batch: str = "descending", |
|
sort_batch: str = "ascending", |
|
drop_last: bool = False, |
|
utt2category_file: str = None, |
|
): |
|
assert check_argument_types() |
|
assert batch_size > 0 |
|
if sort_batch != "ascending" and sort_batch != "descending": |
|
raise ValueError( |
|
f"sort_batch must be ascending or descending: {sort_batch}" |
|
) |
|
if sort_in_batch != "descending" and sort_in_batch != "ascending": |
|
raise ValueError( |
|
f"sort_in_batch must be ascending or descending: {sort_in_batch}" |
|
) |
|
|
|
self.batch_size = batch_size |
|
self.shape_files = shape_files |
|
self.sort_in_batch = sort_in_batch |
|
self.sort_batch = sort_batch |
|
self.drop_last = drop_last |
|
|
|
|
|
|
|
|
|
utt2shapes = [ |
|
load_num_sequence_text(s, loader_type="csv_int") for s in shape_files |
|
] |
|
|
|
first_utt2shape = utt2shapes[0] |
|
for s, d in zip(shape_files, utt2shapes): |
|
if set(d) != set(first_utt2shape): |
|
raise RuntimeError( |
|
f"keys are mismatched between {s} != {shape_files[0]}" |
|
) |
|
|
|
|
|
|
|
keys = sorted(first_utt2shape, key=lambda k: first_utt2shape[k][0]) |
|
if len(keys) == 0: |
|
raise RuntimeError(f"0 lines found: {shape_files[0]}") |
|
|
|
category2utt = {} |
|
if utt2category_file is not None: |
|
utt2category = read_2column_text(utt2category_file) |
|
if set(utt2category) != set(first_utt2shape): |
|
raise RuntimeError( |
|
"keys are mismatched between " |
|
f"{utt2category_file} != {shape_files[0]}" |
|
) |
|
for k in keys: |
|
category2utt.setdefault(utt2category[k], []).append(k) |
|
else: |
|
category2utt["default_category"] = keys |
|
|
|
self.batch_list = [] |
|
for d, v in category2utt.items(): |
|
category_keys = v |
|
|
|
start = 0 |
|
batch_sizes = [] |
|
while True: |
|
k = category_keys[start] |
|
factor = max(int(d[k][0] / m) for d, m in zip(utt2shapes, fold_lengths)) |
|
bs = max(min_batch_size, int(batch_size / (1 + factor))) |
|
if self.drop_last and start + bs > len(category_keys): |
|
|
|
if len(self.batch_list) > 0: |
|
break |
|
|
|
bs = min(len(category_keys) - start, bs) |
|
batch_sizes.append(bs) |
|
start += bs |
|
if start >= len(category_keys): |
|
break |
|
|
|
if len(batch_sizes) == 0: |
|
|
|
raise RuntimeError("0 batches") |
|
|
|
|
|
|
|
if len(batch_sizes) > 1 and batch_sizes[-1] < min_batch_size: |
|
for i in range(batch_sizes.pop(-1)): |
|
batch_sizes[-(i % len(batch_sizes)) - 2] += 1 |
|
|
|
if not self.drop_last: |
|
|
|
assert sum(batch_sizes) == len( |
|
category_keys |
|
), f"{sum(batch_sizes)} != {len(category_keys)}" |
|
|
|
|
|
cur_batch_list = [] |
|
start = 0 |
|
for bs in batch_sizes: |
|
assert len(category_keys) >= start + bs, "Bug" |
|
minibatch_keys = category_keys[start : start + bs] |
|
start += bs |
|
if sort_in_batch == "descending": |
|
minibatch_keys.reverse() |
|
elif sort_in_batch == "ascending": |
|
|
|
pass |
|
else: |
|
raise ValueError( |
|
"sort_in_batch must be ascending or " |
|
f"descending: {sort_in_batch}" |
|
) |
|
cur_batch_list.append(tuple(minibatch_keys)) |
|
|
|
if sort_batch == "ascending": |
|
pass |
|
elif sort_batch == "descending": |
|
cur_batch_list.reverse() |
|
else: |
|
raise ValueError( |
|
f"sort_batch must be ascending or descending: {sort_batch}" |
|
) |
|
self.batch_list.extend(cur_batch_list) |
|
|
|
def __repr__(self): |
|
return ( |
|
f"{self.__class__.__name__}(" |
|
f"N-batch={len(self)}, " |
|
f"batch_size={self.batch_size}, " |
|
f"shape_files={self.shape_files}, " |
|
f"sort_in_batch={self.sort_in_batch}, " |
|
f"sort_batch={self.sort_batch})" |
|
) |
|
|
|
def __len__(self): |
|
return len(self.batch_list) |
|
|
|
def __iter__(self) -> Iterator[Tuple[str, ...]]: |
|
return iter(self.batch_list) |
|
|