Leffa / detectron2 /utils /events.py
franciszzj's picture
init code
b213d84
# Copyright (c) Facebook, Inc. and its affiliates.
import datetime
import json
import logging
import os
import time
from collections import defaultdict
from contextlib import contextmanager
from functools import cached_property
from typing import Optional
import torch
from fvcore.common.history_buffer import HistoryBuffer
from detectron2.utils.file_io import PathManager
__all__ = [
"get_event_storage",
"has_event_storage",
"JSONWriter",
"TensorboardXWriter",
"CommonMetricPrinter",
"EventStorage",
]
_CURRENT_STORAGE_STACK = []
def get_event_storage():
"""
Returns:
The :class:`EventStorage` object that's currently being used.
Throws an error if no :class:`EventStorage` is currently enabled.
"""
assert len(
_CURRENT_STORAGE_STACK
), "get_event_storage() has to be called inside a 'with EventStorage(...)' context!"
return _CURRENT_STORAGE_STACK[-1]
def has_event_storage():
"""
Returns:
Check if there are EventStorage() context existed.
"""
return len(_CURRENT_STORAGE_STACK) > 0
class EventWriter:
"""
Base class for writers that obtain events from :class:`EventStorage` and process them.
"""
def write(self):
raise NotImplementedError
def close(self):
pass
class JSONWriter(EventWriter):
"""
Write scalars to a json file.
It saves scalars as one json per line (instead of a big json) for easy parsing.
Examples parsing such a json file:
::
$ cat metrics.json | jq -s '.[0:2]'
[
{
"data_time": 0.008433341979980469,
"iteration": 19,
"loss": 1.9228371381759644,
"loss_box_reg": 0.050025828182697296,
"loss_classifier": 0.5316952466964722,
"loss_mask": 0.7236229181289673,
"loss_rpn_box": 0.0856662318110466,
"loss_rpn_cls": 0.48198649287223816,
"lr": 0.007173333333333333,
"time": 0.25401854515075684
},
{
"data_time": 0.007216215133666992,
"iteration": 39,
"loss": 1.282649278640747,
"loss_box_reg": 0.06222952902317047,
"loss_classifier": 0.30682939291000366,
"loss_mask": 0.6970193982124329,
"loss_rpn_box": 0.038663312792778015,
"loss_rpn_cls": 0.1471673548221588,
"lr": 0.007706666666666667,
"time": 0.2490077018737793
}
]
$ cat metrics.json | jq '.loss_mask'
0.7126231789588928
0.689423680305481
0.6776131987571716
...
"""
def __init__(self, json_file, window_size=20):
"""
Args:
json_file (str): path to the json file. New data will be appended if the file exists.
window_size (int): the window size of median smoothing for the scalars whose
`smoothing_hint` are True.
"""
self._file_handle = PathManager.open(json_file, "a")
self._window_size = window_size
self._last_write = -1
def write(self):
storage = get_event_storage()
to_save = defaultdict(dict)
for k, (v, iter) in storage.latest_with_smoothing_hint(self._window_size).items():
# keep scalars that have not been written
if iter <= self._last_write:
continue
to_save[iter][k] = v
if len(to_save):
all_iters = sorted(to_save.keys())
self._last_write = max(all_iters)
for itr, scalars_per_iter in to_save.items():
scalars_per_iter["iteration"] = itr
self._file_handle.write(json.dumps(scalars_per_iter, sort_keys=True) + "\n")
self._file_handle.flush()
try:
os.fsync(self._file_handle.fileno())
except AttributeError:
pass
def close(self):
self._file_handle.close()
class TensorboardXWriter(EventWriter):
"""
Write all scalars to a tensorboard file.
"""
def __init__(self, log_dir: str, window_size: int = 20, **kwargs):
"""
Args:
log_dir (str): the directory to save the output events
window_size (int): the scalars will be median-smoothed by this window size
kwargs: other arguments passed to `torch.utils.tensorboard.SummaryWriter(...)`
"""
self._window_size = window_size
self._writer_args = {"log_dir": log_dir, **kwargs}
self._last_write = -1
@cached_property
def _writer(self):
from torch.utils.tensorboard import SummaryWriter
return SummaryWriter(**self._writer_args)
def write(self):
storage = get_event_storage()
new_last_write = self._last_write
for k, (v, iter) in storage.latest_with_smoothing_hint(self._window_size).items():
if iter > self._last_write:
self._writer.add_scalar(k, v, iter)
new_last_write = max(new_last_write, iter)
self._last_write = new_last_write
# storage.put_{image,histogram} is only meant to be used by
# tensorboard writer. So we access its internal fields directly from here.
if len(storage._vis_data) >= 1:
for img_name, img, step_num in storage._vis_data:
self._writer.add_image(img_name, img, step_num)
# Storage stores all image data and rely on this writer to clear them.
# As a result it assumes only one writer will use its image data.
# An alternative design is to let storage store limited recent
# data (e.g. only the most recent image) that all writers can access.
# In that case a writer may not see all image data if its period is long.
storage.clear_images()
if len(storage._histograms) >= 1:
for params in storage._histograms:
self._writer.add_histogram_raw(**params)
storage.clear_histograms()
def close(self):
if "_writer" in self.__dict__:
self._writer.close()
class CommonMetricPrinter(EventWriter):
"""
Print **common** metrics to the terminal, including
iteration time, ETA, memory, all losses, and the learning rate.
It also applies smoothing using a window of 20 elements.
It's meant to print common metrics in common ways.
To print something in more customized ways, please implement a similar printer by yourself.
"""
def __init__(self, max_iter: Optional[int] = None, window_size: int = 20):
"""
Args:
max_iter: the maximum number of iterations to train.
Used to compute ETA. If not given, ETA will not be printed.
window_size (int): the losses will be median-smoothed by this window size
"""
self.logger = logging.getLogger("detectron2.utils.events")
self._max_iter = max_iter
self._window_size = window_size
self._last_write = None # (step, time) of last call to write(). Used to compute ETA
def _get_eta(self, storage) -> Optional[str]:
if self._max_iter is None:
return ""
iteration = storage.iter
try:
eta_seconds = storage.history("time").median(1000) * (self._max_iter - iteration - 1)
storage.put_scalar("eta_seconds", eta_seconds, smoothing_hint=False)
return str(datetime.timedelta(seconds=int(eta_seconds)))
except KeyError:
# estimate eta on our own - more noisy
eta_string = None
if self._last_write is not None:
estimate_iter_time = (time.perf_counter() - self._last_write[1]) / (
iteration - self._last_write[0]
)
eta_seconds = estimate_iter_time * (self._max_iter - iteration - 1)
eta_string = str(datetime.timedelta(seconds=int(eta_seconds)))
self._last_write = (iteration, time.perf_counter())
return eta_string
def write(self):
storage = get_event_storage()
iteration = storage.iter
if iteration == self._max_iter:
# This hook only reports training progress (loss, ETA, etc) but not other data,
# therefore do not write anything after training succeeds, even if this method
# is called.
return
try:
avg_data_time = storage.history("data_time").avg(
storage.count_samples("data_time", self._window_size)
)
last_data_time = storage.history("data_time").latest()
except KeyError:
# they may not exist in the first few iterations (due to warmup)
# or when SimpleTrainer is not used
avg_data_time = None
last_data_time = None
try:
avg_iter_time = storage.history("time").global_avg()
last_iter_time = storage.history("time").latest()
except KeyError:
avg_iter_time = None
last_iter_time = None
try:
lr = "{:.5g}".format(storage.history("lr").latest())
except KeyError:
lr = "N/A"
eta_string = self._get_eta(storage)
if torch.cuda.is_available():
max_mem_mb = torch.cuda.max_memory_allocated() / 1024.0 / 1024.0
else:
max_mem_mb = None
# NOTE: max_mem is parsed by grep in "dev/parse_results.sh"
self.logger.info(
str.format(
" {eta}iter: {iter} {losses} {non_losses} {avg_time}{last_time}"
+ "{avg_data_time}{last_data_time} lr: {lr} {memory}",
eta=f"eta: {eta_string} " if eta_string else "",
iter=iteration,
losses=" ".join(
[
"{}: {:.4g}".format(
k, v.median(storage.count_samples(k, self._window_size))
)
for k, v in storage.histories().items()
if "loss" in k
]
),
non_losses=" ".join(
[
"{}: {:.4g}".format(
k, v.median(storage.count_samples(k, self._window_size))
)
for k, v in storage.histories().items()
if "[metric]" in k
]
),
avg_time="time: {:.4f} ".format(avg_iter_time)
if avg_iter_time is not None
else "",
last_time="last_time: {:.4f} ".format(last_iter_time)
if last_iter_time is not None
else "",
avg_data_time="data_time: {:.4f} ".format(avg_data_time)
if avg_data_time is not None
else "",
last_data_time="last_data_time: {:.4f} ".format(last_data_time)
if last_data_time is not None
else "",
lr=lr,
memory="max_mem: {:.0f}M".format(max_mem_mb) if max_mem_mb is not None else "",
)
)
class EventStorage:
"""
The user-facing class that provides metric storage functionalities.
In the future we may add support for storing / logging other types of data if needed.
"""
def __init__(self, start_iter=0):
"""
Args:
start_iter (int): the iteration number to start with
"""
self._history = defaultdict(HistoryBuffer)
self._smoothing_hints = {}
self._latest_scalars = {}
self._iter = start_iter
self._current_prefix = ""
self._vis_data = []
self._histograms = []
def put_image(self, img_name, img_tensor):
"""
Add an `img_tensor` associated with `img_name`, to be shown on
tensorboard.
Args:
img_name (str): The name of the image to put into tensorboard.
img_tensor (torch.Tensor or numpy.array): An `uint8` or `float`
Tensor of shape `[channel, height, width]` where `channel` is
3. The image format should be RGB. The elements in img_tensor
can either have values in [0, 1] (float32) or [0, 255] (uint8).
The `img_tensor` will be visualized in tensorboard.
"""
self._vis_data.append((img_name, img_tensor, self._iter))
def put_scalar(self, name, value, smoothing_hint=True, cur_iter=None):
"""
Add a scalar `value` to the `HistoryBuffer` associated with `name`.
Args:
smoothing_hint (bool): a 'hint' on whether this scalar is noisy and should be
smoothed when logged. The hint will be accessible through
:meth:`EventStorage.smoothing_hints`. A writer may ignore the hint
and apply custom smoothing rule.
It defaults to True because most scalars we save need to be smoothed to
provide any useful signal.
cur_iter (int): an iteration number to set explicitly instead of current iteration
"""
name = self._current_prefix + name
cur_iter = self._iter if cur_iter is None else cur_iter
history = self._history[name]
value = float(value)
history.update(value, cur_iter)
self._latest_scalars[name] = (value, cur_iter)
existing_hint = self._smoothing_hints.get(name)
if existing_hint is not None:
assert (
existing_hint == smoothing_hint
), "Scalar {} was put with a different smoothing_hint!".format(name)
else:
self._smoothing_hints[name] = smoothing_hint
def put_scalars(self, *, smoothing_hint=True, cur_iter=None, **kwargs):
"""
Put multiple scalars from keyword arguments.
Examples:
storage.put_scalars(loss=my_loss, accuracy=my_accuracy, smoothing_hint=True)
"""
for k, v in kwargs.items():
self.put_scalar(k, v, smoothing_hint=smoothing_hint, cur_iter=cur_iter)
def put_histogram(self, hist_name, hist_tensor, bins=1000):
"""
Create a histogram from a tensor.
Args:
hist_name (str): The name of the histogram to put into tensorboard.
hist_tensor (torch.Tensor): A Tensor of arbitrary shape to be converted
into a histogram.
bins (int): Number of histogram bins.
"""
ht_min, ht_max = hist_tensor.min().item(), hist_tensor.max().item()
# Create a histogram with PyTorch
hist_counts = torch.histc(hist_tensor, bins=bins)
hist_edges = torch.linspace(start=ht_min, end=ht_max, steps=bins + 1, dtype=torch.float32)
# Parameter for the add_histogram_raw function of SummaryWriter
hist_params = dict(
tag=hist_name,
min=ht_min,
max=ht_max,
num=len(hist_tensor),
sum=float(hist_tensor.sum()),
sum_squares=float(torch.sum(hist_tensor**2)),
bucket_limits=hist_edges[1:].tolist(),
bucket_counts=hist_counts.tolist(),
global_step=self._iter,
)
self._histograms.append(hist_params)
def history(self, name):
"""
Returns:
HistoryBuffer: the scalar history for name
"""
ret = self._history.get(name, None)
if ret is None:
raise KeyError("No history metric available for {}!".format(name))
return ret
def histories(self):
"""
Returns:
dict[name -> HistoryBuffer]: the HistoryBuffer for all scalars
"""
return self._history
def latest(self):
"""
Returns:
dict[str -> (float, int)]: mapping from the name of each scalar to the most
recent value and the iteration number its added.
"""
return self._latest_scalars
def latest_with_smoothing_hint(self, window_size=20):
"""
Similar to :meth:`latest`, but the returned values
are either the un-smoothed original latest value,
or a median of the given window_size,
depend on whether the smoothing_hint is True.
This provides a default behavior that other writers can use.
Note: All scalars saved in the past `window_size` iterations are used for smoothing.
This is different from the `window_size` definition in HistoryBuffer.
Use :meth:`get_history_window_size` to get the `window_size` used in HistoryBuffer.
"""
result = {}
for k, (v, itr) in self._latest_scalars.items():
result[k] = (
self._history[k].median(self.count_samples(k, window_size))
if self._smoothing_hints[k]
else v,
itr,
)
return result
def count_samples(self, name, window_size=20):
"""
Return the number of samples logged in the past `window_size` iterations.
"""
samples = 0
data = self._history[name].values()
for _, iter_ in reversed(data):
if iter_ > data[-1][1] - window_size:
samples += 1
else:
break
return samples
def smoothing_hints(self):
"""
Returns:
dict[name -> bool]: the user-provided hint on whether the scalar
is noisy and needs smoothing.
"""
return self._smoothing_hints
def step(self):
"""
User should either: (1) Call this function to increment storage.iter when needed. Or
(2) Set `storage.iter` to the correct iteration number before each iteration.
The storage will then be able to associate the new data with an iteration number.
"""
self._iter += 1
@property
def iter(self):
"""
Returns:
int: The current iteration number. When used together with a trainer,
this is ensured to be the same as trainer.iter.
"""
return self._iter
@iter.setter
def iter(self, val):
self._iter = int(val)
@property
def iteration(self):
# for backward compatibility
return self._iter
def __enter__(self):
_CURRENT_STORAGE_STACK.append(self)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
assert _CURRENT_STORAGE_STACK[-1] == self
_CURRENT_STORAGE_STACK.pop()
@contextmanager
def name_scope(self, name):
"""
Yields:
A context within which all the events added to this storage
will be prefixed by the name scope.
"""
old_prefix = self._current_prefix
self._current_prefix = name.rstrip("/") + "/"
yield
self._current_prefix = old_prefix
def clear_images(self):
"""
Delete all the stored images for visualization. This should be called
after images are written to tensorboard.
"""
self._vis_data = []
def clear_histograms(self):
"""
Delete all the stored histograms for visualization.
This should be called after histograms are written to tensorboard.
"""
self._histograms = []