File size: 4,580 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
from collections import defaultdict
import pytest
import time
from ding.framework import Parallel
def parallel_main():
msg = defaultdict(bool)
def test_callback(key):
msg[key] = True
router = Parallel()
router.on("test_callback", test_callback)
# Wait for nodes to bind
time.sleep(0.7)
for _ in range(30):
router.emit("test_callback", "ping")
if msg["ping"]:
break
time.sleep(0.03)
assert msg["ping"]
# Avoid can not receiving messages from each other after exit parallel
time.sleep(0.7)
@pytest.mark.tmp
def test_parallel_run():
Parallel.runner(n_parallel_workers=2, startup_interval=0.1)(parallel_main)
Parallel.runner(n_parallel_workers=2, protocol="tcp", startup_interval=0.1)(parallel_main)
def uncaught_exception_main():
router = Parallel()
if router.node_id == 0:
time.sleep(0.1)
raise Exception("uncaught exception")
else:
time.sleep(0.2)
@pytest.mark.tmp
def test_uncaught_exception():
# Make one process crash, then the parent process will also crash and output the stack of the wrong process.
with pytest.raises(Exception) as exc_info:
Parallel.runner(n_parallel_workers=2, topology="mesh", startup_interval=0.1)(uncaught_exception_main)
e = exc_info._excinfo[1]
assert "uncaught exception" in str(e)
def disconnected_main():
router = Parallel()
if router.node_id == 0:
time.sleep(0.1)
# Receive two messages then exit
greets = []
router.on("greeting", lambda: greets.append("."))
for _ in range(10):
if len(greets) == 1:
break
else:
time.sleep(0.1)
assert len(greets) > 0
else:
# Send 10 greetings even if the target process is exited
for i in range(10):
router.emit("greeting")
time.sleep(0.1)
assert i == 9
@pytest.mark.tmp
def test_disconnected():
# Make one process exit normally and the rest will still run, even if the network request
# is not received by other processes.
Parallel.runner(n_parallel_workers=2, topology="mesh", startup_interval=0.1)(disconnected_main)
class AutoRecover:
@classmethod
def main_p0(cls):
# Wait for p1's message and recovered message from p1
greets = []
router = Parallel()
router.on("greeting_0", lambda msg: greets.append(msg))
for _ in range(50):
if greets and greets[-1] == "recovered_p1":
break
time.sleep(0.1)
assert greets and greets[-1] == "recovered_p1"
@classmethod
def main_p1(cls):
# Send empty message to p0
# When recovered from exception, send recovered_p1 to p0
# Listen msgs from p2
greets = []
router = Parallel()
router.on("greeting_1", lambda msg: greets.append(msg))
# Test sending message to p0
if router._retries == 0:
for _ in range(10):
router.emit("greeting_0", "")
time.sleep(0.1)
raise Exception("P1 Error")
elif router._retries == 1:
for _ in range(10):
router.emit("greeting_0", "recovered_p1")
time.sleep(0.1)
else:
raise Exception("Failed too many times")
# Test recover and receving message from p2
for _ in range(20):
if greets:
break
time.sleep(0.1)
assert len(greets) > 0
@classmethod
def main_p2(cls):
# Simply send message to p1
router = Parallel()
for _ in range(20):
router.emit("greeting_1", "")
time.sleep(0.1)
@classmethod
def main(cls):
router = Parallel()
if router.node_id == 0:
cls.main_p0()
elif router.node_id == 1:
cls.main_p1()
elif router.node_id == 2:
cls.main_p2()
else:
raise Exception("Invalid node id")
@pytest.mark.tmp
def test_auto_recover():
# With max_retries=1
Parallel.runner(
n_parallel_workers=3, topology="mesh", auto_recover=True, max_retries=1, startup_interval=0.1
)(AutoRecover.main)
# With max_retries=0
with pytest.raises(Exception) as exc_info:
Parallel.runner(
n_parallel_workers=3, topology="mesh", auto_recover=True, max_retries=0, startup_interval=0.1
)(AutoRecover.main)
e = exc_info._excinfo[1]
assert "P1 Error" in str(e)
|