zjowowen's picture
init space
079c32c
raw
history blame
1.7 kB
import threading
import time
from threading import Thread
import numpy as np
import pytest
from ding.utils.data.structure import Cache
@pytest.mark.unittest
class TestCache:
cache = Cache(16, 4, monitor_interval=1.0, _debug=True)
send_count = 0
produce_count = 0
def producer(self, id):
time.sleep(1)
begin_time = time.time()
count = 0
while time.time() - begin_time < 20:
t = np.random.randint(1, 6)
time.sleep(t)
print('[PRODUCER] thread {} use {} second to produce a data'.format(id, t))
self.cache.push_data({'data': []})
count += 1
print('[PRODUCER] thread {} finish job, total produce {} data'.format(id, count))
self.produce_count += count
def consumer(self):
for data in self.cache.get_cached_data_iter():
self.send_count += 1
print('[CONSUMER] cache send {}'.format(self.send_count))
def test(self):
producer_num = 8
self.cache.run()
threadings = [Thread(target=self.producer, args=(i, )) for i in range(producer_num)]
for t in threadings:
t.start()
consumer_thread = Thread(target=self.consumer)
consumer_thread.start()
for t in threadings:
t.join()
# wait timeout mechanism to clear the cache
time.sleep(4 + 1 + 0.1)
assert (self.cache.remain_data_count == 0)
assert (self.send_count == self.produce_count)
self.cache.close()
# wait the cache internal thread close and the consumer_thread get 'STOP' signal
time.sleep(1 + 0.5)
assert (not consumer_thread.is_alive())