|
import pynng |
|
from ditk import logging |
|
from typing import List, Optional, Tuple |
|
from pynng import Bus0 |
|
from time import sleep |
|
|
|
from ding.framework.message_queue.mq import MQ |
|
from ding.utils import MQ_REGISTRY |
|
|
|
|
|
@MQ_REGISTRY.register("nng") |
|
class NNGMQ(MQ): |
|
|
|
def __init__(self, listen_to: str, attach_to: Optional[List[str]] = None, **kwargs) -> None: |
|
""" |
|
Overview: |
|
Connect distributed processes with nng |
|
Arguments: |
|
- listen_to (:obj:`Optional[List[str]]`): The node address to attach to. |
|
- attach_to (:obj:`Optional[List[str]]`): The node's addresses you want to attach to. |
|
""" |
|
self.listen_to = listen_to |
|
self.attach_to = attach_to or [] |
|
self._sock: Bus0 = None |
|
self._running = False |
|
|
|
def listen(self) -> None: |
|
self._sock = sock = Bus0() |
|
sock.listen(self.listen_to) |
|
sleep(0.1) |
|
for contact in self.attach_to: |
|
sock.dial(contact) |
|
logging.info("NNG listen on {}, attach to {}".format(self.listen_to, self.attach_to)) |
|
self._running = True |
|
|
|
def publish(self, topic: str, data: bytes) -> None: |
|
if self._running: |
|
topic += "::" |
|
data = topic.encode() + data |
|
self._sock.send(data) |
|
|
|
def subscribe(self, topic: str) -> None: |
|
return |
|
|
|
def unsubscribe(self, topic: str) -> None: |
|
return |
|
|
|
def recv(self) -> Tuple[str, bytes]: |
|
while True: |
|
try: |
|
if not self._running: |
|
break |
|
msg = self._sock.recv() |
|
|
|
|
|
topic, payload = msg.split(b"::", maxsplit=1) |
|
return topic.decode(), payload |
|
except pynng.Timeout: |
|
logging.warning("Timeout on node {} when waiting for message from bus".format(self.listen_to)) |
|
except pynng.Closed: |
|
if self._running: |
|
logging.error("The socket was not closed under normal circumstances!") |
|
except Exception as e: |
|
logging.error("Meet exception when listening for new messages", e) |
|
|
|
def stop(self) -> None: |
|
if self._running: |
|
self._running = False |
|
self._sock.close() |
|
self._sock = None |
|
|
|
def __del__(self) -> None: |
|
self.stop() |
|
|