zjowowen's picture
init space
079c32c
import pickle
from abc import abstractmethod, ABCMeta
from collections import deque
from threading import Lock
from typing import TypeVar, Iterable, List, Tuple, Union
from .time_ctl import BaseTime
_Tp = TypeVar('_Tp')
class RangedData(metaclass=ABCMeta):
"""
Overview:
A data structure that can store data for a period of time.
Interfaces:
``__init__``, ``append``, ``extend``, ``current``, ``history``, ``expire``, ``__bool__``, ``_get_time``.
Properties:
- expire (:obj:`float`): The expire time.
"""
def __init__(self, expire: float, use_pickle: bool = False):
"""
Overview:
Initialize the RangedData object.
Arguments:
- expire (:obj:`float`): The expire time of the data.
- use_pickle (:obj:`bool`): Whether to use pickle to serialize the data.
"""
self.__expire = expire
self.__use_pickle = use_pickle
self.__check_expire()
self.__data_max_id = 0
self.__data_items = {}
self.__data_lock = Lock()
self.__last_item = None
self.__queue = deque()
self.__lock = Lock()
def __check_expire(self):
"""
Overview:
Check the expire time.
"""
if isinstance(self.__expire, (int, float)):
if self.__expire <= 0:
raise ValueError(
"Expire should be greater than 0, but {actual} found.".format(actual=repr(self.__expire))
)
else:
raise TypeError(
'Expire should be int or float, but {actual} found.'.format(actual=type(self.__expire).__name__)
)
def __registry_data_item(self, data: _Tp) -> int:
"""
Overview:
Registry the data item.
Arguments:
- data (:obj:`_Tp`): The data item.
"""
with self.__data_lock:
self.__data_max_id += 1
if self.__use_pickle:
self.__data_items[self.__data_max_id] = pickle.dumps(data)
else:
self.__data_items[self.__data_max_id] = data
return self.__data_max_id
def __get_data_item(self, data_id: int) -> _Tp:
"""
Overview:
Get the data item.
Arguments:
- data_id (:obj:`int`): The data id.
"""
with self.__data_lock:
if self.__use_pickle:
return pickle.loads(self.__data_items[data_id])
else:
return self.__data_items[data_id]
def __remove_data_item(self, data_id: int):
"""
Overview:
Remove the data item.
Arguments:
- data_id (:obj:`int`): The data id.
"""
with self.__data_lock:
del self.__data_items[data_id]
def __check_time(self, time_: float):
"""
Overview:
Check the time.
Arguments:
- time_ (:obj:`float`): The time.
"""
if self.__queue:
_time, _ = self.__queue[-1]
if time_ < _time:
raise ValueError(
"Time {time} invalid for descending from last time {last_time}".format(
time=repr(time_), last_time=repr(_time)
)
)
def __append_item(self, time_: float, data: _Tp):
"""
Overview:
Append the data item.
Arguments:
- time_ (:obj:`float`): The time.
- data (:obj:`_Tp`): The data item.
"""
self.__queue.append((time_, self.__registry_data_item(data)))
def __flush_history(self):
"""
Overview:
Flush the history data.
"""
_time = self._get_time()
_limit_time = _time - self.__expire
while self.__queue:
_head_time, _head_id = self.__queue.popleft()
if _head_time >= _limit_time:
self.__queue.appendleft((_head_time, _head_id))
break
else:
if self.__last_item:
_last_time, _last_id = self.__last_item
self.__remove_data_item(_last_id)
self.__last_item = (_head_time, _head_id)
def __append(self, time_: float, data: _Tp):
"""
Overview:
Append the data.
"""
self.__check_time(time_)
self.__append_item(time_, data)
self.__flush_history()
def __current(self):
"""
Overview:
Get the current data.
"""
if self.__queue:
_tail_time, _tail_id = self.__queue.pop()
self.__queue.append((_tail_time, _tail_id))
return self.__get_data_item(_tail_id)
elif self.__last_item:
_last_time, _last_id = self.__last_item
return self.__get_data_item(_last_id)
else:
raise ValueError("This range is empty.")
def __history_yield(self):
"""
Overview:
Yield the history data.
"""
_time = self._get_time()
_limit_time = _time - self.__expire
_latest_time, _latest_id = None, None
if self.__last_item:
_latest_time, _latest_id = _last_time, _last_id = self.__last_item
yield max(_last_time, _limit_time), self.__get_data_item(_last_id)
for _item_time, _item_id in self.__queue:
_latest_time, _latest_id = _item_time, _item_id
yield _item_time, self.__get_data_item(_item_id)
if _latest_time is not None and _latest_time < _time:
yield _time, self.__get_data_item(_latest_id)
def __history(self):
"""
Overview:
Get the history data.
"""
return list(self.__history_yield())
def append(self, data: _Tp):
"""
Overview:
Append the data.
"""
with self.__lock:
self.__flush_history()
_time = self._get_time()
self.__append(_time, data)
return self
def extend(self, iter_: Iterable[_Tp]):
"""
Overview:
Extend the data.
"""
with self.__lock:
self.__flush_history()
_time = self._get_time()
for item in iter_:
self.__append(_time, item)
return self
def current(self) -> _Tp:
"""
Overview:
Get the current data.
"""
with self.__lock:
self.__flush_history()
return self.__current()
def history(self) -> List[Tuple[Union[int, float], _Tp]]:
"""
Overview:
Get the history data.
"""
with self.__lock:
self.__flush_history()
return self.__history()
@property
def expire(self) -> float:
"""
Overview:
Get the expire time.
"""
with self.__lock:
self.__flush_history()
return self.__expire
def __bool__(self):
"""
Overview:
Check whether the range is empty.
"""
with self.__lock:
self.__flush_history()
return not not (self.__queue or self.__last_item)
@abstractmethod
def _get_time(self) -> float:
"""
Overview:
Get the current time.
"""
raise NotImplementedError
class TimeRangedData(RangedData):
"""
Overview:
A data structure that can store data for a period of time.
Interfaces:
``__init__``, ``_get_time``, ``append``, ``extend``, ``current``, ``history``, ``expire``, ``__bool__``.
Properties:
- time (:obj:`BaseTime`): The time.
- expire (:obj:`float`): The expire time.
"""
def __init__(self, time_: BaseTime, expire: float):
"""
Overview:
Initialize the TimeRangedData object.
Arguments:
- time_ (:obj:`BaseTime`): The time.
- expire (:obj:`float`): The expire time.
"""
RangedData.__init__(self, expire)
self.__time = time_
def _get_time(self) -> float:
"""
Overview:
Get the current time.
"""
return self.__time.time()
@property
def time(self):
"""
Overview:
Get the time.
"""
return self.__time