File size: 4,580 Bytes
079c32c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from collections import defaultdict
import pytest
import time
from ding.framework import Parallel


def parallel_main():
    msg = defaultdict(bool)

    def test_callback(key):
        msg[key] = True

    router = Parallel()
    router.on("test_callback", test_callback)
    # Wait for nodes to bind
    time.sleep(0.7)
    for _ in range(30):
        router.emit("test_callback", "ping")
        if msg["ping"]:
            break
        time.sleep(0.03)
    assert msg["ping"]
    # Avoid can not receiving messages from each other after exit parallel
    time.sleep(0.7)


@pytest.mark.tmp
def test_parallel_run():
    Parallel.runner(n_parallel_workers=2, startup_interval=0.1)(parallel_main)
    Parallel.runner(n_parallel_workers=2, protocol="tcp", startup_interval=0.1)(parallel_main)


def uncaught_exception_main():
    router = Parallel()
    if router.node_id == 0:
        time.sleep(0.1)
        raise Exception("uncaught exception")
    else:
        time.sleep(0.2)


@pytest.mark.tmp
def test_uncaught_exception():
    # Make one process crash, then the parent process will also crash and output the stack of the wrong process.
    with pytest.raises(Exception) as exc_info:
        Parallel.runner(n_parallel_workers=2, topology="mesh", startup_interval=0.1)(uncaught_exception_main)
    e = exc_info._excinfo[1]
    assert "uncaught exception" in str(e)


def disconnected_main():
    router = Parallel()

    if router.node_id == 0:
        time.sleep(0.1)
        # Receive two messages then exit
        greets = []
        router.on("greeting", lambda: greets.append("."))
        for _ in range(10):
            if len(greets) == 1:
                break
            else:
                time.sleep(0.1)
        assert len(greets) > 0
    else:
        # Send 10 greetings even if the target process is exited
        for i in range(10):
            router.emit("greeting")
            time.sleep(0.1)
        assert i == 9


@pytest.mark.tmp
def test_disconnected():
    # Make one process exit normally and the rest will still run, even if the network request
    # is not received by other processes.
    Parallel.runner(n_parallel_workers=2, topology="mesh", startup_interval=0.1)(disconnected_main)


class AutoRecover:

    @classmethod
    def main_p0(cls):
        # Wait for p1's message and recovered message from p1
        greets = []
        router = Parallel()
        router.on("greeting_0", lambda msg: greets.append(msg))
        for _ in range(50):
            if greets and greets[-1] == "recovered_p1":
                break
            time.sleep(0.1)
        assert greets and greets[-1] == "recovered_p1"

    @classmethod
    def main_p1(cls):
        # Send empty message to p0
        # When recovered from exception, send recovered_p1 to p0
        # Listen msgs from p2
        greets = []
        router = Parallel()
        router.on("greeting_1", lambda msg: greets.append(msg))

        # Test sending message to p0
        if router._retries == 0:
            for _ in range(10):
                router.emit("greeting_0", "")
                time.sleep(0.1)
            raise Exception("P1 Error")
        elif router._retries == 1:
            for _ in range(10):
                router.emit("greeting_0", "recovered_p1")
                time.sleep(0.1)
        else:
            raise Exception("Failed too many times")

        # Test recover and receving message from p2
        for _ in range(20):
            if greets:
                break
            time.sleep(0.1)
        assert len(greets) > 0

    @classmethod
    def main_p2(cls):
        # Simply send message to p1
        router = Parallel()
        for _ in range(20):
            router.emit("greeting_1", "")
            time.sleep(0.1)

    @classmethod
    def main(cls):
        router = Parallel()
        if router.node_id == 0:
            cls.main_p0()
        elif router.node_id == 1:
            cls.main_p1()
        elif router.node_id == 2:
            cls.main_p2()
        else:
            raise Exception("Invalid node id")


@pytest.mark.tmp
def test_auto_recover():
    # With max_retries=1
    Parallel.runner(
        n_parallel_workers=3, topology="mesh", auto_recover=True, max_retries=1, startup_interval=0.1
    )(AutoRecover.main)
    # With max_retries=0
    with pytest.raises(Exception) as exc_info:
        Parallel.runner(
            n_parallel_workers=3, topology="mesh", auto_recover=True, max_retries=0, startup_interval=0.1
        )(AutoRecover.main)
    e = exc_info._excinfo[1]
    assert "P1 Error" in str(e)