import pickle from abc import abstractmethod, ABCMeta from collections import deque from threading import Lock from typing import TypeVar, Iterable, List, Tuple, Union from .time_ctl import BaseTime _Tp = TypeVar('_Tp') class RangedData(metaclass=ABCMeta): """ Overview: A data structure that can store data for a period of time. Interfaces: ``__init__``, ``append``, ``extend``, ``current``, ``history``, ``expire``, ``__bool__``, ``_get_time``. Properties: - expire (:obj:`float`): The expire time. """ def __init__(self, expire: float, use_pickle: bool = False): """ Overview: Initialize the RangedData object. Arguments: - expire (:obj:`float`): The expire time of the data. - use_pickle (:obj:`bool`): Whether to use pickle to serialize the data. """ self.__expire = expire self.__use_pickle = use_pickle self.__check_expire() self.__data_max_id = 0 self.__data_items = {} self.__data_lock = Lock() self.__last_item = None self.__queue = deque() self.__lock = Lock() def __check_expire(self): """ Overview: Check the expire time. """ if isinstance(self.__expire, (int, float)): if self.__expire <= 0: raise ValueError( "Expire should be greater than 0, but {actual} found.".format(actual=repr(self.__expire)) ) else: raise TypeError( 'Expire should be int or float, but {actual} found.'.format(actual=type(self.__expire).__name__) ) def __registry_data_item(self, data: _Tp) -> int: """ Overview: Registry the data item. Arguments: - data (:obj:`_Tp`): The data item. """ with self.__data_lock: self.__data_max_id += 1 if self.__use_pickle: self.__data_items[self.__data_max_id] = pickle.dumps(data) else: self.__data_items[self.__data_max_id] = data return self.__data_max_id def __get_data_item(self, data_id: int) -> _Tp: """ Overview: Get the data item. Arguments: - data_id (:obj:`int`): The data id. """ with self.__data_lock: if self.__use_pickle: return pickle.loads(self.__data_items[data_id]) else: return self.__data_items[data_id] def __remove_data_item(self, data_id: int): """ Overview: Remove the data item. Arguments: - data_id (:obj:`int`): The data id. """ with self.__data_lock: del self.__data_items[data_id] def __check_time(self, time_: float): """ Overview: Check the time. Arguments: - time_ (:obj:`float`): The time. """ if self.__queue: _time, _ = self.__queue[-1] if time_ < _time: raise ValueError( "Time {time} invalid for descending from last time {last_time}".format( time=repr(time_), last_time=repr(_time) ) ) def __append_item(self, time_: float, data: _Tp): """ Overview: Append the data item. Arguments: - time_ (:obj:`float`): The time. - data (:obj:`_Tp`): The data item. """ self.__queue.append((time_, self.__registry_data_item(data))) def __flush_history(self): """ Overview: Flush the history data. """ _time = self._get_time() _limit_time = _time - self.__expire while self.__queue: _head_time, _head_id = self.__queue.popleft() if _head_time >= _limit_time: self.__queue.appendleft((_head_time, _head_id)) break else: if self.__last_item: _last_time, _last_id = self.__last_item self.__remove_data_item(_last_id) self.__last_item = (_head_time, _head_id) def __append(self, time_: float, data: _Tp): """ Overview: Append the data. """ self.__check_time(time_) self.__append_item(time_, data) self.__flush_history() def __current(self): """ Overview: Get the current data. """ if self.__queue: _tail_time, _tail_id = self.__queue.pop() self.__queue.append((_tail_time, _tail_id)) return self.__get_data_item(_tail_id) elif self.__last_item: _last_time, _last_id = self.__last_item return self.__get_data_item(_last_id) else: raise ValueError("This range is empty.") def __history_yield(self): """ Overview: Yield the history data. """ _time = self._get_time() _limit_time = _time - self.__expire _latest_time, _latest_id = None, None if self.__last_item: _latest_time, _latest_id = _last_time, _last_id = self.__last_item yield max(_last_time, _limit_time), self.__get_data_item(_last_id) for _item_time, _item_id in self.__queue: _latest_time, _latest_id = _item_time, _item_id yield _item_time, self.__get_data_item(_item_id) if _latest_time is not None and _latest_time < _time: yield _time, self.__get_data_item(_latest_id) def __history(self): """ Overview: Get the history data. """ return list(self.__history_yield()) def append(self, data: _Tp): """ Overview: Append the data. """ with self.__lock: self.__flush_history() _time = self._get_time() self.__append(_time, data) return self def extend(self, iter_: Iterable[_Tp]): """ Overview: Extend the data. """ with self.__lock: self.__flush_history() _time = self._get_time() for item in iter_: self.__append(_time, item) return self def current(self) -> _Tp: """ Overview: Get the current data. """ with self.__lock: self.__flush_history() return self.__current() def history(self) -> List[Tuple[Union[int, float], _Tp]]: """ Overview: Get the history data. """ with self.__lock: self.__flush_history() return self.__history() @property def expire(self) -> float: """ Overview: Get the expire time. """ with self.__lock: self.__flush_history() return self.__expire def __bool__(self): """ Overview: Check whether the range is empty. """ with self.__lock: self.__flush_history() return not not (self.__queue or self.__last_item) @abstractmethod def _get_time(self) -> float: """ Overview: Get the current time. """ raise NotImplementedError class TimeRangedData(RangedData): """ Overview: A data structure that can store data for a period of time. Interfaces: ``__init__``, ``_get_time``, ``append``, ``extend``, ``current``, ``history``, ``expire``, ``__bool__``. Properties: - time (:obj:`BaseTime`): The time. - expire (:obj:`float`): The expire time. """ def __init__(self, time_: BaseTime, expire: float): """ Overview: Initialize the TimeRangedData object. Arguments: - time_ (:obj:`BaseTime`): The time. - expire (:obj:`float`): The expire time. """ RangedData.__init__(self, expire) self.__time = time_ def _get_time(self) -> float: """ Overview: Get the current time. """ return self.__time.time() @property def time(self): """ Overview: Get the time. """ return self.__time