|
from collections.abc import Sequence, Mapping |
|
from typing import List, Dict, Union, Any |
|
|
|
import torch |
|
import treetensor.torch as ttorch |
|
import re |
|
import collections.abc as container_abcs |
|
from ding.compatibility import torch_ge_131 |
|
|
|
int_classes = int |
|
string_classes = (str, bytes) |
|
np_str_obj_array_pattern = re.compile(r'[SaUO]') |
|
|
|
default_collate_err_msg_format = ( |
|
"default_collate: batch must contain tensors, numpy arrays, numbers, " |
|
"dicts or lists; found {}" |
|
) |
|
|
|
|
|
def ttorch_collate(x, json: bool = False, cat_1dim: bool = True): |
|
""" |
|
Overview: |
|
Collates a list of tensors or nested dictionaries of tensors into a single tensor or nested \ |
|
dictionary of tensors. |
|
|
|
Arguments: |
|
- x : The input list of tensors or nested dictionaries of tensors. |
|
- json (:obj:`bool`): If True, converts the output to JSON format. Defaults to False. |
|
- cat_1dim (:obj:`bool`): If True, concatenates tensors with shape (B, 1) along the last dimension. \ |
|
Defaults to True. |
|
|
|
Returns: |
|
The collated output tensor or nested dictionary of tensors. |
|
|
|
Examples: |
|
>>> # case 1: Collate a list of tensors |
|
>>> tensors = [torch.tensor([1, 2, 3]), torch.tensor([4, 5, 6]), torch.tensor([7, 8, 9])] |
|
>>> collated = ttorch_collate(tensors) |
|
collated = torch.tensor([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) |
|
>>> # case 2: Collate a nested dictionary of tensors |
|
>>> nested_dict = { |
|
'a': torch.tensor([1, 2, 3]), |
|
'b': torch.tensor([4, 5, 6]), |
|
'c': torch.tensor([7, 8, 9]) |
|
} |
|
>>> collated = ttorch_collate(nested_dict) |
|
collated = { |
|
'a': torch.tensor([1, 2, 3]), |
|
'b': torch.tensor([4, 5, 6]), |
|
'c': torch.tensor([7, 8, 9]) |
|
} |
|
>>> # case 3: Collate a list of nested dictionaries of tensors |
|
>>> nested_dicts = [ |
|
{'a': torch.tensor([1, 2, 3]), 'b': torch.tensor([4, 5, 6])}, |
|
{'a': torch.tensor([7, 8, 9]), 'b': torch.tensor([10, 11, 12])} |
|
] |
|
>>> collated = ttorch_collate(nested_dicts) |
|
collated = { |
|
'a': torch.tensor([[1, 2, 3], [7, 8, 9]]), |
|
'b': torch.tensor([[4, 5, 6], [10, 11, 12]]) |
|
} |
|
""" |
|
|
|
def inplace_fn(t): |
|
for k in t.keys(): |
|
if isinstance(t[k], torch.Tensor): |
|
if len(t[k].shape) == 2 and t[k].shape[1] == 1: |
|
t[k] = t[k].squeeze(-1) |
|
else: |
|
inplace_fn(t[k]) |
|
|
|
x = ttorch.stack(x) |
|
if cat_1dim: |
|
inplace_fn(x) |
|
if json: |
|
x = x.json() |
|
return x |
|
|
|
|
|
def default_collate(batch: Sequence, |
|
cat_1dim: bool = True, |
|
ignore_prefix: list = ['collate_ignore']) -> Union[torch.Tensor, Mapping, Sequence]: |
|
""" |
|
Overview: |
|
Put each data field into a tensor with outer dimension batch size. |
|
|
|
Arguments: |
|
- batch (:obj:`Sequence`): A data sequence, whose length is batch size, whose element is one piece of data. |
|
- cat_1dim (:obj:`bool`): Whether to concatenate tensors with shape (B, 1) to (B), defaults to True. |
|
- ignore_prefix (:obj:`list`): A list of prefixes to ignore when collating dictionaries, \ |
|
defaults to ['collate_ignore']. |
|
|
|
Returns: |
|
- ret (:obj:`Union[torch.Tensor, Mapping, Sequence]`): the collated data, with batch size into each data \ |
|
field. The return dtype depends on the original element dtype, can be [torch.Tensor, Mapping, Sequence]. |
|
|
|
Example: |
|
>>> # a list with B tensors shaped (m, n) -->> a tensor shaped (B, m, n) |
|
>>> a = [torch.zeros(2,3) for _ in range(4)] |
|
>>> default_collate(a).shape |
|
torch.Size([4, 2, 3]) |
|
>>> |
|
>>> # a list with B lists, each list contains m elements -->> a list of m tensors, each with shape (B, ) |
|
>>> a = [[0 for __ in range(3)] for _ in range(4)] |
|
>>> default_collate(a) |
|
[tensor([0, 0, 0, 0]), tensor([0, 0, 0, 0]), tensor([0, 0, 0, 0])] |
|
>>> |
|
>>> # a list with B dicts, whose values are tensors shaped :math:`(m, n)` -->> |
|
>>> # a dict whose values are tensors with shape :math:`(B, m, n)` |
|
>>> a = [{i: torch.zeros(i,i+1) for i in range(2, 4)} for _ in range(4)] |
|
>>> print(a[0][2].shape, a[0][3].shape) |
|
torch.Size([2, 3]) torch.Size([3, 4]) |
|
>>> b = default_collate(a) |
|
>>> print(b[2].shape, b[3].shape) |
|
torch.Size([4, 2, 3]) torch.Size([4, 3, 4]) |
|
""" |
|
|
|
if isinstance(batch, ttorch.Tensor): |
|
return batch.json() |
|
|
|
elem = batch[0] |
|
elem_type = type(elem) |
|
if isinstance(elem, torch.Tensor): |
|
out = None |
|
if torch_ge_131() and torch.utils.data.get_worker_info() is not None: |
|
|
|
|
|
numel = sum([x.numel() for x in batch]) |
|
storage = elem.storage()._new_shared(numel) |
|
out = elem.new(storage) |
|
if elem.shape == (1, ) and cat_1dim: |
|
|
|
return torch.cat(batch, 0, out=out) |
|
|
|
else: |
|
return torch.stack(batch, 0, out=out) |
|
elif isinstance(elem, ttorch.Tensor): |
|
return ttorch_collate(batch, json=True, cat_1dim=cat_1dim) |
|
elif elem_type.__module__ == 'numpy' and elem_type.__name__ != 'str_' \ |
|
and elem_type.__name__ != 'string_': |
|
if elem_type.__name__ == 'ndarray': |
|
|
|
if np_str_obj_array_pattern.search(elem.dtype.str) is not None: |
|
raise TypeError(default_collate_err_msg_format.format(elem.dtype)) |
|
return default_collate([torch.as_tensor(b) for b in batch], cat_1dim=cat_1dim) |
|
elif elem.shape == (): |
|
return torch.as_tensor(batch) |
|
elif isinstance(elem, float): |
|
return torch.tensor(batch, dtype=torch.float32) |
|
elif isinstance(elem, int_classes): |
|
dtype = torch.bool if isinstance(elem, bool) else torch.int64 |
|
return torch.tensor(batch, dtype=dtype) |
|
elif isinstance(elem, string_classes): |
|
return batch |
|
elif isinstance(elem, container_abcs.Mapping): |
|
ret = {} |
|
for key in elem: |
|
if any([key.startswith(t) for t in ignore_prefix]): |
|
ret[key] = [d[key] for d in batch] |
|
else: |
|
ret[key] = default_collate([d[key] for d in batch], cat_1dim=cat_1dim) |
|
return ret |
|
elif isinstance(elem, tuple) and hasattr(elem, '_fields'): |
|
return elem_type(*(default_collate(samples, cat_1dim=cat_1dim) for samples in zip(*batch))) |
|
elif isinstance(elem, container_abcs.Sequence): |
|
transposed = zip(*batch) |
|
return [default_collate(samples, cat_1dim=cat_1dim) for samples in transposed] |
|
|
|
raise TypeError(default_collate_err_msg_format.format(elem_type)) |
|
|
|
|
|
def timestep_collate(batch: List[Dict[str, Any]]) -> Dict[str, Union[torch.Tensor, list]]: |
|
""" |
|
Overview: |
|
Collates a batch of timestepped data fields into tensors with the outer dimension being the batch size. \ |
|
Each timestepped data field is represented as a tensor with shape [T, B, any_dims], where T is the length \ |
|
of the sequence, B is the batch size, and any_dims represents the shape of the tensor at each timestep. |
|
|
|
Arguments: |
|
- batch(:obj:`List[Dict[str, Any]]`): A list of dictionaries with length B, where each dictionary represents \ |
|
a timestepped data field. Each dictionary contains a key-value pair, where the key is the name of the \ |
|
data field and the value is a sequence of torch.Tensor objects with any shape. |
|
|
|
Returns: |
|
- ret(:obj:`Dict[str, Union[torch.Tensor, list]]`): The collated data, with the timestep and batch size \ |
|
incorporated into each data field. The shape of each data field is [T, B, dim1, dim2, ...]. |
|
|
|
Examples: |
|
>>> batch = [ |
|
{'data0': [torch.tensor([1, 2, 3]), torch.tensor([4, 5, 6])]}, |
|
{'data1': [torch.tensor([7, 8, 9]), torch.tensor([10, 11, 12])]} |
|
] |
|
>>> collated_data = timestep_collate(batch) |
|
>>> print(collated_data['data'].shape) |
|
torch.Size([2, 2, 3]) |
|
""" |
|
|
|
def stack(data): |
|
if isinstance(data, container_abcs.Mapping): |
|
return {k: stack(data[k]) for k in data} |
|
elif isinstance(data, container_abcs.Sequence) and isinstance(data[0], torch.Tensor): |
|
return torch.stack(data) |
|
else: |
|
return data |
|
|
|
elem = batch[0] |
|
assert isinstance(elem, (container_abcs.Mapping, list)), type(elem) |
|
if isinstance(batch[0], list): |
|
prev_state = [[b[i].get('prev_state') for b in batch] for i in range(len(batch[0]))] |
|
batch_data = ttorch.stack([ttorch_collate(b) for b in batch]) |
|
del batch_data.prev_state |
|
batch_data = batch_data.transpose(1, 0) |
|
batch_data.prev_state = prev_state |
|
else: |
|
prev_state = [b.pop('prev_state') for b in batch] |
|
batch_data = default_collate(batch) |
|
batch_data = stack(batch_data) |
|
transformed_prev_state = list(zip(*prev_state)) |
|
batch_data['prev_state'] = transformed_prev_state |
|
|
|
for i in range(len(batch)): |
|
batch[i]['prev_state'] = prev_state[i] |
|
return batch_data |
|
|
|
|
|
def diff_shape_collate(batch: Sequence) -> Union[torch.Tensor, Mapping, Sequence]: |
|
""" |
|
Overview: |
|
Collates a batch of data with different shapes. |
|
This function is similar to `default_collate`, but it allows tensors in the batch to have `None` values, \ |
|
which is common in StarCraft observations. |
|
|
|
Arguments: |
|
- batch (:obj:`Sequence`): A sequence of data, where each element is a piece of data. |
|
|
|
Returns: |
|
- ret (:obj:`Union[torch.Tensor, Mapping, Sequence]`): The collated data, with the batch size applied \ |
|
to each data field. The return type depends on the original element type and can be a torch.Tensor, \ |
|
Mapping, or Sequence. |
|
|
|
Examples: |
|
>>> # a list with B tensors shaped (m, n) -->> a tensor shaped (B, m, n) |
|
>>> a = [torch.zeros(2,3) for _ in range(4)] |
|
>>> diff_shape_collate(a).shape |
|
torch.Size([4, 2, 3]) |
|
>>> |
|
>>> # a list with B lists, each list contains m elements -->> a list of m tensors, each with shape (B, ) |
|
>>> a = [[0 for __ in range(3)] for _ in range(4)] |
|
>>> diff_shape_collate(a) |
|
[tensor([0, 0, 0, 0]), tensor([0, 0, 0, 0]), tensor([0, 0, 0, 0])] |
|
>>> |
|
>>> # a list with B dicts, whose values are tensors shaped :math:`(m, n)` -->> |
|
>>> # a dict whose values are tensors with shape :math:`(B, m, n)` |
|
>>> a = [{i: torch.zeros(i,i+1) for i in range(2, 4)} for _ in range(4)] |
|
>>> print(a[0][2].shape, a[0][3].shape) |
|
torch.Size([2, 3]) torch.Size([3, 4]) |
|
>>> b = diff_shape_collate(a) |
|
>>> print(b[2].shape, b[3].shape) |
|
torch.Size([4, 2, 3]) torch.Size([4, 3, 4]) |
|
""" |
|
elem = batch[0] |
|
elem_type = type(elem) |
|
if any([isinstance(elem, type(None)) for elem in batch]): |
|
return batch |
|
elif isinstance(elem, torch.Tensor): |
|
shapes = [e.shape for e in batch] |
|
if len(set(shapes)) != 1: |
|
return batch |
|
else: |
|
return torch.stack(batch, 0) |
|
elif elem_type.__module__ == 'numpy' and elem_type.__name__ != 'str_' \ |
|
and elem_type.__name__ != 'string_': |
|
if elem_type.__name__ == 'ndarray': |
|
return diff_shape_collate([torch.as_tensor(b) for b in batch]) |
|
elif elem.shape == (): |
|
return torch.as_tensor(batch) |
|
elif isinstance(elem, float): |
|
return torch.tensor(batch, dtype=torch.float32) |
|
elif isinstance(elem, int_classes): |
|
dtype = torch.bool if isinstance(elem, bool) else torch.int64 |
|
return torch.tensor(batch, dtype=dtype) |
|
elif isinstance(elem, Mapping): |
|
return {key: diff_shape_collate([d[key] for d in batch]) for key in elem} |
|
elif isinstance(elem, tuple) and hasattr(elem, '_fields'): |
|
return elem_type(*(diff_shape_collate(samples) for samples in zip(*batch))) |
|
elif isinstance(elem, Sequence): |
|
transposed = zip(*batch) |
|
return [diff_shape_collate(samples) for samples in transposed] |
|
|
|
raise TypeError('not support element type: {}'.format(elem_type)) |
|
|
|
|
|
def default_decollate( |
|
batch: Union[torch.Tensor, Sequence, Mapping], |
|
ignore: List[str] = ['prev_state', 'prev_actor_state', 'prev_critic_state'] |
|
) -> List[Any]: |
|
""" |
|
Overview: |
|
Drag out batch_size collated data's batch size to decollate it, which is the reverse operation of \ |
|
``default_collate``. |
|
|
|
Arguments: |
|
- batch (:obj:`Union[torch.Tensor, Sequence, Mapping]`): The collated data batch. It can be a tensor, \ |
|
sequence, or mapping. |
|
- ignore(:obj:`List[str]`): A list of names to be ignored. Only applicable if the input ``batch`` is a \ |
|
dictionary. If a key is in this list, its value will remain the same without decollation. Defaults to \ |
|
['prev_state', 'prev_actor_state', 'prev_critic_state']. |
|
|
|
Returns: |
|
- ret (:obj:`List[Any]`): A list with B elements, where B is the batch size. |
|
|
|
Examples: |
|
>>> batch = { |
|
'a': [ |
|
[1, 2, 3], |
|
[4, 5, 6] |
|
], |
|
'b': [ |
|
[7, 8, 9], |
|
[10, 11, 12] |
|
]} |
|
>>> default_decollate(batch) |
|
{ |
|
0: {'a': [1, 2, 3], 'b': [7, 8, 9]}, |
|
1: {'a': [4, 5, 6], 'b': [10, 11, 12]}, |
|
} |
|
""" |
|
if isinstance(batch, torch.Tensor): |
|
batch = torch.split(batch, 1, dim=0) |
|
|
|
|
|
if len(batch[0].shape) > 1: |
|
batch = [elem.squeeze(0) for elem in batch] |
|
return list(batch) |
|
elif isinstance(batch, Sequence): |
|
return list(zip(*[default_decollate(e) for e in batch])) |
|
elif isinstance(batch, Mapping): |
|
tmp = {k: v if k in ignore else default_decollate(v) for k, v in batch.items()} |
|
B = len(list(tmp.values())[0]) |
|
return [{k: tmp[k][i] for k in tmp.keys()} for i in range(B)] |
|
elif isinstance(batch, torch.distributions.Distribution): |
|
return [None for _ in range(batch.batch_shape[0])] |
|
|
|
raise TypeError("Not supported batch type: {}".format(type(batch))) |
|
|