File size: 2,524 Bytes
079c32c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import pynng
from ditk import logging
from typing import List, Optional, Tuple
from pynng import Bus0
from time import sleep

from ding.framework.message_queue.mq import MQ
from ding.utils import MQ_REGISTRY


@MQ_REGISTRY.register("nng")
class NNGMQ(MQ):

    def __init__(self, listen_to: str, attach_to: Optional[List[str]] = None, **kwargs) -> None:
        """
        Overview:
            Connect distributed processes with nng
        Arguments:
            - listen_to (:obj:`Optional[List[str]]`): The node address to attach to.
            - attach_to (:obj:`Optional[List[str]]`): The node's addresses you want to attach to.
        """
        self.listen_to = listen_to
        self.attach_to = attach_to or []
        self._sock: Bus0 = None
        self._running = False

    def listen(self) -> None:
        self._sock = sock = Bus0()
        sock.listen(self.listen_to)
        sleep(0.1)  # Wait for peers to bind
        for contact in self.attach_to:
            sock.dial(contact)
        logging.info("NNG listen on {}, attach to {}".format(self.listen_to, self.attach_to))
        self._running = True

    def publish(self, topic: str, data: bytes) -> None:
        if self._running:
            topic += "::"
            data = topic.encode() + data
            self._sock.send(data)

    def subscribe(self, topic: str) -> None:
        return

    def unsubscribe(self, topic: str) -> None:
        return

    def recv(self) -> Tuple[str, bytes]:
        while True:
            try:
                if not self._running:
                    break
                msg = self._sock.recv()
                # Use topic at the beginning of the message, so we don't need to call pickle.loads
                # when the current process is not subscribed to the topic.
                topic, payload = msg.split(b"::", maxsplit=1)
                return topic.decode(), payload
            except pynng.Timeout:
                logging.warning("Timeout on node {} when waiting for message from bus".format(self.listen_to))
            except pynng.Closed:
                if self._running:
                    logging.error("The socket was not closed under normal circumstances!")
            except Exception as e:
                logging.error("Meet exception when listening for new messages", e)

    def stop(self) -> None:
        if self._running:
            self._running = False
            self._sock.close()
            self._sock = None

    def __del__(self) -> None:
        self.stop()