|
from time import sleep |
|
import uuid |
|
import pytest |
|
|
|
from multiprocessing import Pool |
|
from unittest.mock import Mock, patch |
|
from threading import Thread |
|
from ding.utils import WatchDog |
|
|
|
from ding.framework.message_queue.redis import RedisMQ |
|
|
|
|
|
def redis_main(i): |
|
node_id0 = uuid.uuid4().hex.encode() |
|
|
|
class MockRedis(Mock): |
|
|
|
def publish(self, topic, data): |
|
assert topic == "t" |
|
assert b"::" in data |
|
|
|
def pubsub(self): |
|
return MockPubSub() |
|
|
|
class MockPubSub(Mock): |
|
|
|
def get_message(self, **kwargs): |
|
return {"channel": b"t", "data": node_id0 + b"::data"} |
|
|
|
with patch("redis.Redis", MockRedis): |
|
host = "127.0.0.1" |
|
port = 6379 |
|
mq = RedisMQ(redis_host=host, redis_port=port) |
|
mq.listen() |
|
if i == 0: |
|
mq._id = node_id0 |
|
|
|
def send_message(): |
|
for _ in range(5): |
|
mq.publish("t", b"data") |
|
sleep(0.1) |
|
|
|
def recv_message(): |
|
|
|
mq.subscribe("t") |
|
print("RECV", mq.recv()) |
|
|
|
send_thread = Thread(target=send_message, daemon=True) |
|
recv_thread = Thread(target=recv_message, daemon=True) |
|
send_thread.start() |
|
recv_thread.start() |
|
|
|
send_thread.join() |
|
|
|
watchdog = WatchDog(1) |
|
with pytest.raises(TimeoutError): |
|
watchdog.start() |
|
recv_thread.join() |
|
watchdog.stop() |
|
else: |
|
mq.subscribe("t") |
|
topic, msg = mq.recv() |
|
assert topic == "t" |
|
assert msg == b"data" |
|
|
|
|
|
@pytest.mark.unittest |
|
@pytest.mark.execution_timeout(10) |
|
def test_redis(): |
|
with Pool(processes=2) as pool: |
|
pool.map(redis_main, range(2)) |
|
|