File size: 5,308 Bytes
079c32c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import time
from queue import Queue
from threading import Thread
from typing import Any

from ding.utils import LockContext, LockContextType


class Cache:
    """
    Overview:
        Data cache for reducing concurrent pressure, with timeout and full queue eject mechanism
    Interfaces:
        ``__init__``, ``push_data``, ``get_cached_data_iter``, ``run``, ``close``
    Property:
        remain_data_count
    """

    def __init__(self, maxlen: int, timeout: float, monitor_interval: float = 1.0, _debug: bool = False) -> None:
        """
        Overview:
            Initialize the cache object.
        Arguments:
            - maxlen (:obj:`int`): Maximum length of the cache queue.
            - timeout (:obj:`float`): Maximum second of the data can remain in the cache.
            - monitor_interval (:obj:`float`): Interval of the timeout monitor thread checks the time.
            - _debug (:obj:`bool`): Whether to use debug mode or not, which enables debug print info.
        """
        assert maxlen > 0
        self.maxlen = maxlen
        self.timeout = timeout
        self.monitor_interval = monitor_interval
        self.debug = _debug
        # two separate receive and send queue for reducing interaction frequency and interference
        self.receive_queue = Queue(maxlen)
        self.send_queue = Queue(maxlen)
        self.receive_lock = LockContext(type_=LockContextType.THREAD_LOCK)
        self._timeout_thread = Thread(target=self._timeout_monitor)
        # the bool flag for gracefully shutting down the timeout monitor thread
        self._timeout_thread_flag = True

    def push_data(self, data: Any) -> None:
        """
        Overview:
            Push data into receive queue, if the receive queue is full(after push), then push all the data
            in receive queue into send queue.
        Arguments:
            - data (:obj:`Any`): The data which needs to be added into receive queue

        .. tip::
            thread-safe
        """
        with self.receive_lock:
            # Push the data item and current time together into queue
            self.receive_queue.put([data, time.time()])
            if self.receive_queue.full():
                self.dprint('send total receive_queue, current len:{}'.format(self.receive_queue.qsize()))
                while not self.receive_queue.empty():
                    # Only send raw data to send queue
                    self.send_queue.put(self.receive_queue.get()[0])

    def get_cached_data_iter(self) -> 'callable_iterator':  # noqa
        """
        Overview:
            Get the iterator of the send queue. Once a data is pushed into send queue, it can be accessed by
            this iterator. 'STOP' is the end flag of this iterator.
        Returns:
            - iterator (:obj:`callable_iterator`) The send queue iterator.
        """
        return iter(self.send_queue.get, 'STOP')

    def _timeout_monitor(self) -> None:
        """
        Overview:
            The workflow of the timeout monitor thread.
        """
        # Loop until the flag is set to False
        while self._timeout_thread_flag:
            # A fixed check interval
            time.sleep(self.monitor_interval)
            with self.receive_lock:
                # For non-empty receive_queue, check the time from head to tail(only access no pop) until finding
                # the first data which is not timeout
                while not self.receive_queue.empty():
                    # Check the time of the data remains in the receive_queue, if excesses the timeout then returns True
                    is_timeout = self._warn_if_timeout()
                    if not is_timeout:
                        break

    def _warn_if_timeout(self) -> bool:
        """
        Overview:
            Return whether is timeout.
        Returns
            - result: (:obj:`bool`) Whether is timeout.
        """
        wait_time = time.time() - self.receive_queue.queue[0][1]
        if wait_time >= self.timeout:
            self.dprint(
                'excess the maximum wait time, eject from the cache.(wait_time/timeout: {}/{}'.format(
                    wait_time, self.timeout
                )
            )
            self.send_queue.put(self.receive_queue.get()[0])
            return True
        else:
            return False

    def run(self) -> None:
        """
        Overview:
            Launch the cache internal thread, e.g. timeout monitor thread.
        """
        self._timeout_thread.start()

    def close(self) -> None:
        """
        Overview:
            Shut down the cache internal thread and send the end flag to send queue's iterator.
        """
        self._timeout_thread_flag = False
        self.send_queue.put('STOP')

    def dprint(self, s: str) -> None:
        """
        Overview:
            In debug mode, print debug str.
        Arguments:
            - s (:obj:`str`): Debug info to be printed.
        """
        if self.debug:
            print('[CACHE] ' + s)

    @property
    def remain_data_count(self) -> int:
        """
        Overview:
            Return receive queue's remain data count
        Returns:
            - count (:obj:`int`): The size of the receive queue.
        """
        return self.receive_queue.qsize()