zjowowen's picture
init space
079c32c
raw
history blame
1.85 kB
from time import sleep
import uuid
import pytest
from multiprocessing import Pool
from unittest.mock import Mock, patch
from threading import Thread
from ding.utils import WatchDog
from ding.framework.message_queue.redis import RedisMQ
def redis_main(i):
node_id0 = uuid.uuid4().hex.encode()
class MockRedis(Mock):
def publish(self, topic, data):
assert topic == "t"
assert b"::" in data
def pubsub(self):
return MockPubSub()
class MockPubSub(Mock):
def get_message(self, **kwargs):
return {"channel": b"t", "data": node_id0 + b"::data"}
with patch("redis.Redis", MockRedis):
host = "127.0.0.1"
port = 6379
mq = RedisMQ(redis_host=host, redis_port=port)
mq.listen()
if i == 0:
mq._id = node_id0
def send_message():
for _ in range(5):
mq.publish("t", b"data")
sleep(0.1)
def recv_message():
# Should not receive any message
mq.subscribe("t")
print("RECV", mq.recv())
send_thread = Thread(target=send_message, daemon=True)
recv_thread = Thread(target=recv_message, daemon=True)
send_thread.start()
recv_thread.start()
send_thread.join()
watchdog = WatchDog(1)
with pytest.raises(TimeoutError):
watchdog.start()
recv_thread.join()
watchdog.stop()
else:
mq.subscribe("t")
topic, msg = mq.recv()
assert topic == "t"
assert msg == b"data"
@pytest.mark.unittest
@pytest.mark.execution_timeout(10)
def test_redis():
with Pool(processes=2) as pool:
pool.map(redis_main, range(2))