|
from time import sleep |
|
import pytest |
|
|
|
import multiprocessing as mp |
|
from ding.framework.message_queue.nng import NNGMQ |
|
|
|
|
|
def nng_main(i): |
|
if i == 0: |
|
listen_to = "tcp://127.0.0.1:50515" |
|
attach_to = None |
|
mq = NNGMQ(listen_to=listen_to, attach_to=attach_to) |
|
mq.listen() |
|
for _ in range(10): |
|
mq.publish("t", b"data") |
|
sleep(0.1) |
|
else: |
|
listen_to = "tcp://127.0.0.1:50516" |
|
attach_to = ["tcp://127.0.0.1:50515"] |
|
mq = NNGMQ(listen_to=listen_to, attach_to=attach_to) |
|
mq.listen() |
|
topic, msg = mq.recv() |
|
assert topic == "t" |
|
assert msg == b"data" |
|
|
|
|
|
@pytest.mark.unittest |
|
@pytest.mark.execution_timeout(10) |
|
def test_nng(): |
|
ctx = mp.get_context("spawn") |
|
with ctx.Pool(processes=2) as pool: |
|
pool.map(nng_main, range(2)) |
|
|