File size: 6,711 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
import pytest
import torch
from ding.data.buffer import DequeBuffer
from ding.data.buffer.middleware import clone_object, use_time_check, staleness_check, sample_range_view
from ding.data.buffer.middleware import PriorityExperienceReplay, group_sample
from ding.data.buffer.middleware.padding import padding
@pytest.mark.unittest
def test_clone_object():
buffer = DequeBuffer(size=10).use(clone_object())
# Store a dict, a list, a tensor
arr = [{"key": "v1"}, ["a"], torch.Tensor([1, 2, 3])]
for o in arr:
buffer.push(o)
# Modify it
for item in buffer.sample(len(arr)):
item = item.data
if isinstance(item, dict):
item["key"] = "v2"
elif isinstance(item, list):
item.append("b")
elif isinstance(item, torch.Tensor):
item[0] = 3
else:
raise Exception("Unexpected type")
# Resample it, and check their values
for item in buffer.sample(len(arr)):
item = item.data
if isinstance(item, dict):
assert item["key"] == "v1"
elif isinstance(item, list):
assert len(item) == 1
elif isinstance(item, torch.Tensor):
assert item[0] == 1
else:
raise Exception("Unexpected type")
def get_data():
return {'obs': torch.randn(4), 'reward': torch.randn(1), 'info': 'xxx'}
@pytest.mark.unittest
def test_use_time_check():
N = 6
buffer = DequeBuffer(size=10)
buffer.use(use_time_check(buffer, max_use=2))
for _ in range(N):
buffer.push(get_data())
for _ in range(2):
data = buffer.sample(size=N, replace=False)
assert len(data) == N
with pytest.raises(ValueError):
buffer.sample(size=1, replace=False)
@pytest.mark.unittest
def test_staleness_check():
N = 6
buffer = DequeBuffer(size=10)
buffer.use(staleness_check(buffer, max_staleness=10))
with pytest.raises(AssertionError):
buffer.push(get_data())
for _ in range(N):
buffer.push(get_data(), meta={'train_iter_data_collected': 0})
data = buffer.sample(size=N, replace=False, train_iter_sample_data=9)
assert len(data) == N
data = buffer.sample(size=N, replace=False, train_iter_sample_data=10) # edge case
assert len(data) == N
for _ in range(2):
buffer.push(get_data(), meta={'train_iter_data_collected': 5})
assert buffer.count() == 8
with pytest.raises(ValueError):
data = buffer.sample(size=N, replace=False, train_iter_sample_data=11)
assert buffer.count() == 2
@pytest.mark.unittest
def test_priority():
N = 5
buffer = DequeBuffer(size=10)
buffer.use(PriorityExperienceReplay(buffer, IS_weight=True))
for _ in range(N):
buffer.push(get_data(), meta={'priority': 2.0})
assert buffer.count() == N
for _ in range(N):
buffer.push(get_data(), meta={'priority': 2.0})
assert buffer.count() == N + N
data = buffer.sample(size=N + N, replace=False)
assert len(data) == N + N
for item in data:
meta = item.meta
assert set(meta.keys()).issuperset(set(['priority', 'priority_idx', 'priority_IS']))
meta['priority'] = 3.0
for item in data:
data, index, meta = item.data, item.index, item.meta
buffer.update(index, data, meta)
data = buffer.sample(size=1)
assert data[0].meta['priority'] == 3.0
buffer.delete(data[0].index)
assert buffer.count() == N + N - 1
buffer.clear()
assert buffer.count() == 0
@pytest.mark.unittest
def test_priority_from_collector():
N = 5
buffer = DequeBuffer(size=10)
buffer.use(PriorityExperienceReplay(buffer, IS_weight=True))
for _ in range(N):
tmp_data = get_data()
tmp_data['priority'] = 2.0
buffer.push(get_data())
assert buffer.count() == N
for _ in range(N):
tmp_data = get_data()
tmp_data['priority'] = 2.0
buffer.push(get_data())
assert buffer.count() == N + N
data = buffer.sample(size=N + N, replace=False)
assert len(data) == N + N
for item in data:
meta = item.meta
assert set(meta.keys()).issuperset(set(['priority', 'priority_idx', 'priority_IS']))
meta['priority'] = 3.0
for item in data:
data, index, meta = item.data, item.index, item.meta
buffer.update(index, data, meta)
data = buffer.sample(size=1)
assert data[0].meta['priority'] == 3.0
buffer.delete(data[0].index)
assert buffer.count() == N + N - 1
buffer.clear()
assert buffer.count() == 0
@pytest.mark.unittest
def test_padding():
buffer = DequeBuffer(size=10)
buffer.use(padding())
for i in range(10):
buffer.push(i, {"group": i & 5}) # [3,3,2,2]
sampled_data = buffer.sample(4, groupby="group")
assert len(sampled_data) == 4
for grouped_data in sampled_data:
assert len(grouped_data) == 3
@pytest.mark.unittest
def test_group_sample():
buffer = DequeBuffer(size=10)
buffer.use(padding(policy="none")).use(group_sample(size_in_group=5, ordered_in_group=True, max_use_in_group=True))
for i in range(4):
buffer.push(i, {"episode": 0})
for i in range(6):
buffer.push(i, {"episode": 1})
sampled_data = buffer.sample(2, groupby="episode")
assert len(sampled_data) == 2
def check_group0(grouped_data):
# In group0 should find only last record with data as None
n_none = 0
for item in grouped_data:
if item.data is None:
n_none += 1
assert n_none == 1
def check_group1(grouped_data):
# In group1 every record should have data and meta
for item in grouped_data:
assert item.data is not None
for grouped_data in sampled_data:
assert len(grouped_data) == 5
meta = grouped_data[0].meta
if meta and "episode" in meta and meta["episode"] == 1:
check_group1(grouped_data)
else:
check_group0(grouped_data)
@pytest.mark.unittest
def test_sample_range_view():
buffer_ = DequeBuffer(size=10)
for i in range(5):
buffer_.push({'data': 'x'})
for i in range(5, 5 + 3):
buffer_.push({'data': 'y'})
for i in range(8, 8 + 2):
buffer_.push({'data': 'z'})
buffer1 = buffer_.view()
buffer1.use(sample_range_view(buffer1, start=-5, end=-2))
for _ in range(10):
sampled_data = buffer1.sample(1)
assert sampled_data[0].data['data'] == 'y'
buffer2 = buffer_.view()
buffer2.use(sample_range_view(buffer1, start=-2))
for _ in range(10):
sampled_data = buffer2.sample(1)
assert sampled_data[0].data['data'] == 'z'
|