File size: 4,120 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
from collections import defaultdict
from typing import Callable, Optional
from concurrent.futures import ThreadPoolExecutor
from copy import copy
import fnmatch
from ditk import logging
class EventLoop:
loops = {}
def __init__(self, name: str = "default") -> None:
self._name = name
self._listeners = defaultdict(list)
self._thread_pool = ThreadPoolExecutor(max_workers=2)
self._exception = None
self._active = True
def on(self, event: str, fn: Callable) -> None:
"""
Overview:
Subscribe to an event, execute this function every time the event is emitted.
Arguments:
- event (:obj:`str`): Event name.
- fn (:obj:`Callable`): The function.
"""
self._listeners[event].append(fn)
def off(self, event: str, fn: Optional[Callable] = None) -> None:
"""
Overview:
Unsubscribe an event, or a specific function in the event.
Arguments:
- event (:obj:`str`): Event name.
- fn (:obj:`Optional[Callable]`): The function.
"""
for e in fnmatch.filter(self._listeners.keys(), event):
if fn:
try:
self._listeners[e].remove(fn)
except:
pass
else:
self._listeners[e] = []
def once(self, event: str, fn: Callable) -> None:
"""
Overview:
Subscribe to an event, execute this function only once when the event is emitted.
Arguments:
- event (:obj:`str`): Event name.
- fn (:obj:`Callable`): The function.
"""
def once_callback(*args, **kwargs):
self.off(event, once_callback)
fn(*args, **kwargs)
self.on(event, once_callback)
def emit(self, event: str, *args, **kwargs) -> None:
"""
Overview:
Emit an event, call listeners.
If there is an unhandled error in this event loop, calling emit will raise an exception,
which will cause the process to exit.
Arguments:
- event (:obj:`str`): Event name.
"""
if self._exception:
raise self._exception
if self._active:
self._thread_pool.submit(self._trigger, event, *args, **kwargs)
def _trigger(self, event: str, *args, **kwargs) -> None:
"""
Overview:
Execute the callbacks under the event. If any callback raise an exception,
we will save the traceback and ignore the exception.
Arguments:
- event (:obj:`str`): Event name.
"""
if event not in self._listeners:
logging.debug("Event {} is not registered in the callbacks of {}!".format(event, self._name))
return
for fn in copy(self._listeners[event]):
try:
fn(*args, **kwargs)
except Exception as e:
self._exception = e
def listened(self, event: str) -> bool:
"""
Overview:
Check if the event has been listened to.
Arguments:
- event (:obj:`str`): Event name
Returns:
- listened (:obj:`bool`): Whether this event has been listened to.
"""
return event in self._listeners
@classmethod
def get_event_loop(cls: type, name: str = "default") -> "EventLoop":
"""
Overview:
Get new event loop when name not exists, or return the existed instance.
Arguments:
- name (:obj:`str`): Name of event loop.
"""
if name in cls.loops:
return cls.loops[name]
cls.loops[name] = loop = cls(name)
return loop
def stop(self) -> None:
self._active = False
self._listeners = defaultdict(list)
self._exception = None
self._thread_pool.shutdown()
if self._name in EventLoop.loops:
del EventLoop.loops[self._name]
def __del__(self) -> None:
if self._active:
self.stop()
|