gomoku / DI-engine /ding /utils /tests /test_log_writer_helper.py
zjowowen's picture
init space
079c32c
import pytest
import time
import tempfile
import shutil
import os
from os import path
from ding.framework import Parallel
from ding.framework.task import task
from ding.utils import DistributedWriter
def main_distributed_writer(tempdir):
with task.start():
time.sleep(task.router.node_id * 1) # Sleep 0 and 1, write to different files
tblogger = DistributedWriter(tempdir).plugin(task.router, is_writer=(task.router.node_id == 0))
def _add_scalar(ctx):
n = 10
for i in range(n):
tblogger.add_scalar(str(task.router.node_id), task.router.node_id, ctx.total_step * n + i)
task.use(_add_scalar)
task.use(lambda _: time.sleep(0.2))
task.run(max_step=3)
time.sleep(0.3 + (1 - task.router.node_id) * 2)
@pytest.mark.unittest
def test_distributed_writer():
tempdir = path.join(tempfile.gettempdir(), "tblogger")
try:
Parallel.runner(n_parallel_workers=2)(main_distributed_writer, tempdir)
assert path.exists(tempdir)
assert len(os.listdir(tempdir)) == 1
finally:
if path.exists(tempdir):
shutil.rmtree(tempdir)