zjowowen's picture
init space
079c32c
import time
from queue import Queue
from threading import Thread
from typing import Any
from ding.utils import LockContext, LockContextType
class Cache:
"""
Overview:
Data cache for reducing concurrent pressure, with timeout and full queue eject mechanism
Interfaces:
``__init__``, ``push_data``, ``get_cached_data_iter``, ``run``, ``close``
Property:
remain_data_count
"""
def __init__(self, maxlen: int, timeout: float, monitor_interval: float = 1.0, _debug: bool = False) -> None:
"""
Overview:
Initialize the cache object.
Arguments:
- maxlen (:obj:`int`): Maximum length of the cache queue.
- timeout (:obj:`float`): Maximum second of the data can remain in the cache.
- monitor_interval (:obj:`float`): Interval of the timeout monitor thread checks the time.
- _debug (:obj:`bool`): Whether to use debug mode or not, which enables debug print info.
"""
assert maxlen > 0
self.maxlen = maxlen
self.timeout = timeout
self.monitor_interval = monitor_interval
self.debug = _debug
# two separate receive and send queue for reducing interaction frequency and interference
self.receive_queue = Queue(maxlen)
self.send_queue = Queue(maxlen)
self.receive_lock = LockContext(type_=LockContextType.THREAD_LOCK)
self._timeout_thread = Thread(target=self._timeout_monitor)
# the bool flag for gracefully shutting down the timeout monitor thread
self._timeout_thread_flag = True
def push_data(self, data: Any) -> None:
"""
Overview:
Push data into receive queue, if the receive queue is full(after push), then push all the data
in receive queue into send queue.
Arguments:
- data (:obj:`Any`): The data which needs to be added into receive queue
.. tip::
thread-safe
"""
with self.receive_lock:
# Push the data item and current time together into queue
self.receive_queue.put([data, time.time()])
if self.receive_queue.full():
self.dprint('send total receive_queue, current len:{}'.format(self.receive_queue.qsize()))
while not self.receive_queue.empty():
# Only send raw data to send queue
self.send_queue.put(self.receive_queue.get()[0])
def get_cached_data_iter(self) -> 'callable_iterator': # noqa
"""
Overview:
Get the iterator of the send queue. Once a data is pushed into send queue, it can be accessed by
this iterator. 'STOP' is the end flag of this iterator.
Returns:
- iterator (:obj:`callable_iterator`) The send queue iterator.
"""
return iter(self.send_queue.get, 'STOP')
def _timeout_monitor(self) -> None:
"""
Overview:
The workflow of the timeout monitor thread.
"""
# Loop until the flag is set to False
while self._timeout_thread_flag:
# A fixed check interval
time.sleep(self.monitor_interval)
with self.receive_lock:
# For non-empty receive_queue, check the time from head to tail(only access no pop) until finding
# the first data which is not timeout
while not self.receive_queue.empty():
# Check the time of the data remains in the receive_queue, if excesses the timeout then returns True
is_timeout = self._warn_if_timeout()
if not is_timeout:
break
def _warn_if_timeout(self) -> bool:
"""
Overview:
Return whether is timeout.
Returns
- result: (:obj:`bool`) Whether is timeout.
"""
wait_time = time.time() - self.receive_queue.queue[0][1]
if wait_time >= self.timeout:
self.dprint(
'excess the maximum wait time, eject from the cache.(wait_time/timeout: {}/{}'.format(
wait_time, self.timeout
)
)
self.send_queue.put(self.receive_queue.get()[0])
return True
else:
return False
def run(self) -> None:
"""
Overview:
Launch the cache internal thread, e.g. timeout monitor thread.
"""
self._timeout_thread.start()
def close(self) -> None:
"""
Overview:
Shut down the cache internal thread and send the end flag to send queue's iterator.
"""
self._timeout_thread_flag = False
self.send_queue.put('STOP')
def dprint(self, s: str) -> None:
"""
Overview:
In debug mode, print debug str.
Arguments:
- s (:obj:`str`): Debug info to be printed.
"""
if self.debug:
print('[CACHE] ' + s)
@property
def remain_data_count(self) -> int:
"""
Overview:
Return receive queue's remain data count
Returns:
- count (:obj:`int`): The size of the receive queue.
"""
return self.receive_queue.qsize()