|
|
|
from pathlib import Path |
|
import json |
|
import os |
|
from contextlib import contextmanager |
|
from .ticker import IntervalTicker |
|
|
|
|
|
_CURRENT_STORAGE_STACK = [] |
|
|
|
|
|
def get_event_storage(): |
|
""" |
|
Returns: |
|
The :class:`EventStorage` object that's currently being used. |
|
Throws an error if no :class:`EventStorage` is currently enabled. |
|
""" |
|
assert len( |
|
_CURRENT_STORAGE_STACK |
|
), "get_event_storage() has to be called inside a 'with EventStorage(...)' context!" |
|
return _CURRENT_STORAGE_STACK[-1] |
|
|
|
|
|
def read_lined_json(fname): |
|
with Path(fname).open('r') as f: |
|
for line in f: |
|
item = json.loads(line) |
|
yield item |
|
|
|
|
|
def read_stats(dirname, key): |
|
if dirname is None or not (fname := Path(dirname) / "history.json").is_file(): |
|
return [], [] |
|
stats = read_lined_json(fname) |
|
stats = list(filter(lambda x: key in x, stats)) |
|
xs = [e['iter'] for e in stats] |
|
ys = [e[key] for e in stats] |
|
return xs, ys |
|
|
|
|
|
class EventStorage(): |
|
def __init__(self, output_dir="./hotdog", start_iter=0, flush_period=60): |
|
self.iter = start_iter |
|
self.ticker = IntervalTicker(flush_period) |
|
self.history = [] |
|
self._current_prefix = "" |
|
self._init_curr_buffer_() |
|
|
|
self.output_dir = output_dir |
|
self.writable = False |
|
|
|
def _open(self): |
|
if self.writable: |
|
output_dir = Path(self.output_dir) |
|
if not output_dir.is_dir(): |
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
json_fname = output_dir / 'history.json' |
|
|
|
self._file_handle = json_fname.open('a', encoding='utf8') |
|
self.output_dir = output_dir |
|
|
|
def _init_curr_buffer_(self): |
|
self.curr_buffer = {'iter': self.iter} |
|
|
|
def step(self, flush=False): |
|
self.history.append(self.curr_buffer) |
|
|
|
on_flush_period = self.ticker.tick() |
|
if flush or on_flush_period: |
|
self.flush_history() |
|
|
|
self.iter += 1 |
|
self._init_curr_buffer_() |
|
|
|
def flush_history(self): |
|
if self.writable: |
|
for item in self.history: |
|
line = json.dumps(item, sort_keys=True, ensure_ascii=False) + "\n" |
|
self._file_handle.write(line) |
|
self._file_handle.flush() |
|
self.history = [] |
|
|
|
def full_key(self, key): |
|
assert isinstance(key, str) |
|
name = self._current_prefix + key |
|
return name |
|
|
|
def put(self, key, val): |
|
key = self.full_key(key) |
|
assert isinstance(val, (int, float, str)) |
|
if isinstance(val, float): |
|
val = round(val, 3) |
|
self.curr_buffer[key] = val |
|
|
|
def put_scalars(self, **kwargs): |
|
for k, v in kwargs.items(): |
|
self.put(k, v) |
|
|
|
def put_artifact(self, key, ext,p, save_func): |
|
if not self.writable: |
|
return |
|
p=p.replace(" ","_") |
|
os.makedirs(self.output_dir / key, exist_ok=True) |
|
fname = (self.output_dir / key / f"step_{self.iter}_{p}").with_suffix(ext) |
|
fname = str(fname) |
|
|
|
|
|
|
|
|
|
save_func(fname) |
|
self.put(key, fname) |
|
return fname |
|
|
|
def close(self): |
|
self.flush_history() |
|
if self.writable: |
|
self._file_handle.close() |
|
|
|
def get_last(self): |
|
if len(self.history) > 0: |
|
last = self.history[-1] |
|
return last |
|
|
|
def __enter__(self): |
|
if len(_CURRENT_STORAGE_STACK) > 0: |
|
parent = _CURRENT_STORAGE_STACK[-1] |
|
root, dirname = parent.output_dir, self.output_dir |
|
if root is not None and dirname is not None: |
|
child_dir = parent.output_dir / f"{self.output_dir}_{parent.iter}" |
|
self.output_dir = child_dir |
|
parent.put(str(dirname), str(child_dir)) |
|
|
|
if self.output_dir is not None: |
|
self.writable = True |
|
self._open() |
|
|
|
_CURRENT_STORAGE_STACK.append(self) |
|
return self |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
assert _CURRENT_STORAGE_STACK[-1] == self |
|
_CURRENT_STORAGE_STACK.pop() |
|
self.close() |
|
|