File size: 7,811 Bytes
ad16788
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
import logging
from typing import Any
from typing import Dict
from typing import Iterator
from typing import List
from typing import Sequence
from typing import Tuple
from typing import Union

import numpy as np
import torch
from typeguard import check_argument_types

from espnet2.iterators.abs_iter_factory import AbsIterFactory
from espnet2.iterators.sequence_iter_factory import SequenceIterFactory
from espnet2.samplers.abs_sampler import AbsSampler


class ChunkIterFactory(AbsIterFactory):
    """Creates chunks from a sequence

    Examples:
        >>> batches = [["id1"], ["id2"], ...]
        >>> batch_size = 128
        >>> chunk_length = 1000
        >>> iter_factory = ChunkIterFactory(dataset, batches, batch_size, chunk_length)
        >>> it = iter_factory.build_iter(epoch)
        >>> for ids, batch in it:
        ...     ...

    - The number of mini-batches are varied in each epochs and
      we can't get the number in advance
      because IterFactory doesn't be given to the length information.
    - Since the first reason, "num_iters_per_epoch" can't be implemented
      for this iterator. Instead of it, "num_samples_per_epoch" is implemented.

    """

    def __init__(
        self,
        dataset,
        batch_size: int,
        batches: Union[AbsSampler, Sequence[Sequence[Any]]],
        chunk_length: Union[int, str],
        chunk_shift_ratio: float = 0.5,
        num_cache_chunks: int = 1024,
        num_samples_per_epoch: int = None,
        seed: int = 0,
        shuffle: bool = False,
        num_workers: int = 0,
        collate_fn=None,
        pin_memory: bool = False,
    ):
        assert check_argument_types()
        assert all(len(x) == 1 for x in batches), "batch-size must be 1"

        self.per_sample_iter_factory = SequenceIterFactory(
            dataset=dataset,
            batches=batches,
            num_iters_per_epoch=num_samples_per_epoch,
            seed=seed,
            shuffle=shuffle,
            num_workers=num_workers,
            collate_fn=collate_fn,
            pin_memory=pin_memory,
        )

        self.num_cache_chunks = max(num_cache_chunks, batch_size)
        if isinstance(chunk_length, str):
            if len(chunk_length) == 0:
                raise ValueError("e.g. 5,8 or 3-5: but got empty string")

            self.chunk_lengths = []
            for x in chunk_length.split(","):
                try:
                    sps = list(map(int, x.split("-")))
                except ValueError:
                    raise ValueError(f"e.g. 5,8 or 3-5: but got {chunk_length}")

                if len(sps) > 2:
                    raise ValueError(f"e.g. 5,8 or 3-5: but got {chunk_length}")
                elif len(sps) == 2:
                    # Append all numbers between the range into the candidates
                    self.chunk_lengths += list(range(sps[0], sps[1] + 1))
                else:
                    self.chunk_lengths += [sps[0]]
        else:
            # Single candidates: Fixed chunk length
            self.chunk_lengths = [chunk_length]

        self.chunk_shift_ratio = chunk_shift_ratio
        self.batch_size = batch_size
        self.seed = seed
        self.shuffle = shuffle

    def build_iter(
        self,
        epoch: int,
        shuffle: bool = None,
    ) -> Iterator[Tuple[List[str], Dict[str, torch.Tensor]]]:
        per_sample_loader = self.per_sample_iter_factory.build_iter(epoch, shuffle)

        if shuffle is None:
            shuffle = self.shuffle
        state = np.random.RandomState(epoch + self.seed)

        # NOTE(kamo):
        #   This iterator supports multiple chunk lengths and
        #   keep chunks for each lenghts here until collecting specified numbers
        cache_chunks_dict = {}
        cache_id_list_dict = {}
        for ids, batch in per_sample_loader:
            # Must be per-sample-loader
            assert len(ids) == 1, f"Must be per-sample-loader: {len(ids)}"
            assert all(len(x) == 1 for x in batch.values())

            # Get keys of sequence data
            sequence_keys = []
            for key in batch:
                if key + "_lengths" in batch:
                    sequence_keys.append(key)
            # Remove lengths data and get the first sample
            batch = {k: v[0] for k, v in batch.items() if not k.endswith("_lengths")}
            id_ = ids[0]

            for key in sequence_keys:
                if len(batch[key]) != len(batch[sequence_keys[0]]):
                    raise RuntimeError(
                        f"All sequences must has same length: "
                        f"{len(batch[key])} != {len(batch[sequence_keys[0]])}"
                    )

            L = len(batch[sequence_keys[0]])
            # Select chunk length
            chunk_lengths = [lg for lg in self.chunk_lengths if lg < L]
            if len(chunk_lengths) == 0:
                logging.warning(
                    f"The length of '{id_}' is {L}, but it is shorter than "
                    f"any candidates of chunk-length: {self.chunk_lengths}"
                )
                continue

            W = int(state.choice(chunk_lengths, 1))
            cache_id_list = cache_id_list_dict.setdefault(W, [])
            cache_chunks = cache_chunks_dict.setdefault(W, {})

            # Shift width to the next chunk
            S = int(W * self.chunk_shift_ratio)
            # Number of chunks
            N = (L - W) // S + 1
            if shuffle:
                Z = state.randint(0, (L - W) % S + 1)
            else:
                Z = 0

            # Split a sequence into chunks.
            # Note that the marginal frames divided by chunk length are discarded
            for k, v in batch.items():
                if k not in cache_chunks:
                    cache_chunks[k] = []
                if k in sequence_keys:
                    # Shift chunks with overlapped length for data augmentation
                    cache_chunks[k] += [v[Z + i * S : Z + i * S + W] for i in range(N)]
                else:
                    # If not sequence, use whole data instead of chunk
                    cache_chunks[k] += [v for _ in range(N)]
            cache_id_list += [id_ for _ in range(N)]

            if len(cache_id_list) > self.num_cache_chunks:
                cache_id_list, cache_chunks = yield from self._generate_mini_batches(
                    cache_id_list,
                    cache_chunks,
                    shuffle,
                    state,
                )

            cache_id_list_dict[W] = cache_id_list
            cache_chunks_dict[W] = cache_chunks

        else:
            for W in cache_id_list_dict:
                cache_id_list = cache_id_list_dict.setdefault(W, [])
                cache_chunks = cache_chunks_dict.setdefault(W, {})

                yield from self._generate_mini_batches(
                    cache_id_list,
                    cache_chunks,
                    shuffle,
                    state,
                )

    def _generate_mini_batches(
        self,
        id_list: List[str],
        batches: Dict[str, List[torch.Tensor]],
        shuffle: bool,
        state: np.random.RandomState,
    ):
        if shuffle:
            indices = np.arange(0, len(id_list))
            state.shuffle(indices)
            batches = {k: [v[i] for i in indices] for k, v in batches.items()}
            id_list = [id_list[i] for i in indices]

        bs = self.batch_size
        while len(id_list) >= bs:
            # Make mini-batch and yield
            yield (
                id_list[:bs],
                {k: torch.stack(v[:bs], 0) for k, v in batches.items()},
            )
            id_list = id_list[bs:]
            batches = {k: v[bs:] for k, v in batches.items()}

        return id_list, batches