|
import pytest |
|
import time |
|
import numpy as np |
|
import torch |
|
from multiprocessing import Process |
|
|
|
from ding.envs.env_manager.subprocess_env_manager import ShmBuffer |
|
|
|
|
|
def writer(shm): |
|
while True: |
|
shm.fill(np.random.random(size=(4, 84, 84)).astype(np.float32)) |
|
time.sleep(1) |
|
|
|
|
|
@pytest.mark.unittest |
|
def test_shm(): |
|
|
|
shm = ShmBuffer(dtype=np.float32, shape=(4, 84, 84), copy_on_get=False) |
|
writer_process = Process(target=writer, args=(shm, )) |
|
writer_process.start() |
|
|
|
time.sleep(0.1) |
|
|
|
data1 = shm.get() |
|
time.sleep(1) |
|
data2 = shm.get() |
|
|
|
assert (data1 == data2).all() |
|
|
|
time.sleep(1) |
|
data3 = shm.get().copy() |
|
time.sleep(1) |
|
data4 = shm.get() |
|
assert (data3 != data4).all() |
|
|
|
writer_process.terminate() |
|
|