File size: 2,233 Bytes
efe5745
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# generates periodic hearbeats for remote expriment monitoring
from pathlib import Path
import json
from inspect import stack
from .ticker import IntervalTicker

_CURRENT_BEAT_STACK = []


def get_heartbeat():
    """
    Returns:
        The :class:`HeartBeat` object that's currently being used.
        Throws an error if no :class:`EventStorage` is currently enabled.
    """
    assert len(
        _CURRENT_BEAT_STACK
    ), "get_heartbeat() has to be called inside a 'with EventStorage(...)' context!"
    return _CURRENT_BEAT_STACK[-1]


def get_tqdm_meter(pbar, format_dict):
    format_dict['bar_format'] = "{r_bar}"
    meter_str = pbar.format_meter(**format_dict)
    meter_str = meter_str[2:]
    return meter_str


def caller_info(n_stack_up):
    info = stack()[1 + n_stack_up]  # 1 up as base so that it starts from caller
    msg = f"{info.filename}:{info.lineno} - {info.function}"
    return msg


class HeartBeat():
    def __init__(
        self, pbar, write_interval=10,
        output_dir="./", fname="heartbeat.json"
    ):
        self.pbar = pbar
        self.fname = Path(output_dir) / fname
        self.ticker = IntervalTicker(write_interval)
        self.completed = False

        # force one write at the beginning
        self.beat(force_write=True, n_stack_up=2)

    def beat(self, force_write=False, n_stack_up=1):
        on_write_period = self.ticker.tick()
        if force_write or on_write_period:
            stats = self.stats()
            stats['caller'] = caller_info(n_stack_up)

            with open(self.fname, "w") as f:
                json.dump(stats, f)

    def done(self):
        self.completed = True
        self.beat(force_write=True, n_stack_up=2)

    def stats(self):
        pbar = self.pbar
        fdict = pbar.format_dict
        stats = {
            "beat": self.ticker.tick_str(),
            "done": self.completed,
            "meter": get_tqdm_meter(pbar, fdict),
            "elapsed": int(fdict['elapsed'])
        }
        return stats

    def __enter__(self):
        _CURRENT_BEAT_STACK.append(self)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        assert _CURRENT_BEAT_STACK[-1] == self
        _CURRENT_BEAT_STACK.pop()