from collections import defaultdict from typing import Callable, Optional from concurrent.futures import ThreadPoolExecutor from copy import copy import fnmatch from ditk import logging class EventLoop: loops = {} def __init__(self, name: str = "default") -> None: self._name = name self._listeners = defaultdict(list) self._thread_pool = ThreadPoolExecutor(max_workers=2) self._exception = None self._active = True def on(self, event: str, fn: Callable) -> None: """ Overview: Subscribe to an event, execute this function every time the event is emitted. Arguments: - event (:obj:`str`): Event name. - fn (:obj:`Callable`): The function. """ self._listeners[event].append(fn) def off(self, event: str, fn: Optional[Callable] = None) -> None: """ Overview: Unsubscribe an event, or a specific function in the event. Arguments: - event (:obj:`str`): Event name. - fn (:obj:`Optional[Callable]`): The function. """ for e in fnmatch.filter(self._listeners.keys(), event): if fn: try: self._listeners[e].remove(fn) except: pass else: self._listeners[e] = [] def once(self, event: str, fn: Callable) -> None: """ Overview: Subscribe to an event, execute this function only once when the event is emitted. Arguments: - event (:obj:`str`): Event name. - fn (:obj:`Callable`): The function. """ def once_callback(*args, **kwargs): self.off(event, once_callback) fn(*args, **kwargs) self.on(event, once_callback) def emit(self, event: str, *args, **kwargs) -> None: """ Overview: Emit an event, call listeners. If there is an unhandled error in this event loop, calling emit will raise an exception, which will cause the process to exit. Arguments: - event (:obj:`str`): Event name. """ if self._exception: raise self._exception if self._active: self._thread_pool.submit(self._trigger, event, *args, **kwargs) def _trigger(self, event: str, *args, **kwargs) -> None: """ Overview: Execute the callbacks under the event. If any callback raise an exception, we will save the traceback and ignore the exception. Arguments: - event (:obj:`str`): Event name. """ if event not in self._listeners: logging.debug("Event {} is not registered in the callbacks of {}!".format(event, self._name)) return for fn in copy(self._listeners[event]): try: fn(*args, **kwargs) except Exception as e: self._exception = e def listened(self, event: str) -> bool: """ Overview: Check if the event has been listened to. Arguments: - event (:obj:`str`): Event name Returns: - listened (:obj:`bool`): Whether this event has been listened to. """ return event in self._listeners @classmethod def get_event_loop(cls: type, name: str = "default") -> "EventLoop": """ Overview: Get new event loop when name not exists, or return the existed instance. Arguments: - name (:obj:`str`): Name of event loop. """ if name in cls.loops: return cls.loops[name] cls.loops[name] = loop = cls(name) return loop def stop(self) -> None: self._active = False self._listeners = defaultdict(list) self._exception = None self._thread_pool.shutdown() if self._name in EventLoop.loops: del EventLoop.loops[self._name] def __del__(self) -> None: if self._active: self.stop()