File size: 1,852 Bytes
079c32c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from time import sleep
import uuid
import pytest

from multiprocessing import Pool
from unittest.mock import Mock, patch
from threading import Thread
from ding.utils import WatchDog

from ding.framework.message_queue.redis import RedisMQ


def redis_main(i):
    node_id0 = uuid.uuid4().hex.encode()

    class MockRedis(Mock):

        def publish(self, topic, data):
            assert topic == "t"
            assert b"::" in data

        def pubsub(self):
            return MockPubSub()

    class MockPubSub(Mock):

        def get_message(self, **kwargs):
            return {"channel": b"t", "data": node_id0 + b"::data"}

    with patch("redis.Redis", MockRedis):
        host = "127.0.0.1"
        port = 6379
        mq = RedisMQ(redis_host=host, redis_port=port)
        mq.listen()
        if i == 0:
            mq._id = node_id0

            def send_message():
                for _ in range(5):
                    mq.publish("t", b"data")
                    sleep(0.1)

            def recv_message():
                # Should not receive any message
                mq.subscribe("t")
                print("RECV", mq.recv())

            send_thread = Thread(target=send_message, daemon=True)
            recv_thread = Thread(target=recv_message, daemon=True)
            send_thread.start()
            recv_thread.start()

            send_thread.join()

            watchdog = WatchDog(1)
            with pytest.raises(TimeoutError):
                watchdog.start()
                recv_thread.join()
            watchdog.stop()
        else:
            mq.subscribe("t")
            topic, msg = mq.recv()
            assert topic == "t"
            assert msg == b"data"


@pytest.mark.unittest
@pytest.mark.execution_timeout(10)
def test_redis():
    with Pool(processes=2) as pool:
        pool.map(redis_main, range(2))