Spaces:
Running
on
Zero
Running
on
Zero
# Copyright (c) Meta Platforms, Inc. and affiliates. | |
# All rights reserved. | |
# | |
# This source code is licensed under the license found in the | |
# LICENSE file in the root directory of this source tree. | |
import logging | |
import os | |
import os.path as osp | |
import sys | |
from typing import Callable, Optional, Union | |
import torch | |
from mmengine.dist import master_only | |
from mmengine.hooks import Hook | |
from mmengine.logging import print_log | |
from mmengine.registry import HOOKS | |
def check_kineto() -> bool: # noqa | |
kineto_exist = False | |
try: | |
if torch.autograd.kineto_available(): | |
kineto_exist = True | |
except AttributeError: | |
print_log('NO KINETO', logger='current', level=logging.WARNING) | |
return kineto_exist | |
class ProfilerHook(Hook): | |
"""A hook to analyze performance during training and inference. | |
PyTorch Profiler is a tool that allows the collection of the performance | |
metrics during the training. More details on Profiler can be found at | |
`official docs <https://pytorch.org/docs/stable/profiler.html | |
#torch.profiler.profile>`_ | |
Args: | |
by_epoch (bool): Profile performance by epoch or by iteration. | |
Defaults to True. | |
profile_times (int): The period (epoch/iter) recorded by the profiler. | |
Defaults to 1. For example, profile_iters=10 and by_epoch=False, | |
indicate that 0-10 iterations are recorded. | |
activity_with_cpu (bool): Activities to be used in the analysis (CPU) | |
activity_with_cuda (bool): Activities to be used in the analysis (CUDA) | |
schedule (dict, optional): Key-word arguments passed to | |
`torch.profile.schedule <https://pytorch.org/docs/stable/ | |
profiler.html#torch.profiler.schedule>`_. | |
Defaults to None, which means profiling without a schedule | |
on_trace_ready (callable, dict, optional): Either a handler or a dict | |
of generating handler. Defaults to None, which means profiling | |
without an on_trace_ready.The Callable type needs to construct its | |
own function that can handle 'torch.autograd.profiler.profile'. | |
Two officially recommended ways are provided: | |
- ``schedule=dict(type='log_trace')``: Print the profiling result | |
in the terminal. See more details in the `PyTorch official tutorial`_. | |
The configurable arguments are the same as | |
``prof.key_averages().table`` | |
- ``scheduler=dict(type='tb_trace')``: Profile the performance | |
with tensorboard. See more details in the tutorial | |
`profile with tensorboard`_. | |
record_shapes (bool): Save information about operator's input shapes. | |
Defaults to False. | |
profile_memory (bool): Track tensor memory allocation/deallocation. | |
Defaults to False. | |
with_stack (bool): Record source information (file and line number) | |
for the ops. Defaults to False. | |
with_flops (bool): Use formula to estimate the FLOPS of specific | |
operators (matrix multiplication and 2D convolution). | |
Defaults to False. | |
json_trace_path (str, optional): Exports the collected trace in Chrome | |
JSON format. Chrome use 'chrome://tracing' view json file. | |
Defaults to None, which means profiling does not store json files. | |
Warnings: | |
The profiler will be closed after ``profile_times`` iterations | |
automatically. Please make sure the configuration of your scheduler | |
will not close the profiler before the iteration reach the value of | |
``profile_times`` | |
Examples: | |
>>> # tensorboard trace | |
>>> trace_config = dict(type='tb_trace') | |
>>> profiler_hook_cfg = dict(on_trace_ready=trace_config) | |
.. _PyTorch official tutorial: https://pytorch.org/tutorials/recipes/recipes/profiler_recipe.html#using-profiler-to-analyze-execution-time | |
.. _profile with tensorboard: https://pytorch.org/tutorials/intermediate/tensorboard_profiler_tutorial.html#pytorch-profiler-with-tensorboard | |
""" # noqa: E501 | |
priority = 'VERY_LOW' | |
def __init__(self, | |
*, | |
by_epoch: bool = True, | |
profile_times: int = 1, | |
activity_with_cpu: bool = True, | |
activity_with_cuda: bool = False, | |
schedule: Optional[dict] = None, | |
on_trace_ready: Union[Callable, dict, None] = None, | |
record_shapes: bool = False, | |
profile_memory: bool = False, | |
with_stack: bool = False, | |
with_flops: bool = False, | |
json_trace_path: Optional[str] = None) -> None: | |
try: | |
from torch import profiler | |
except ImportError: | |
raise ImportError('please upgrade torch above 1.8.1') | |
if not check_kineto(): | |
raise ImportError('Due to Kineto support issues, please upgrade ' | |
'pytorch above 1.8.1(windows users above 1.9.1)') | |
assert isinstance(by_epoch, bool), '``by_epoch`` should be a boolean.' | |
self.by_epoch = by_epoch | |
if profile_times < 1: | |
raise ValueError('profile_iters should be greater than 0, ' | |
f'but got {profile_times}') | |
if by_epoch and profile_times > 1: | |
raise ValueError( | |
f'Profiler will profile 0-{profile_times} epochs.\n' | |
'Since profiler will slow down the training, it is recommended' | |
' to train 1 epoch with ProfilerHook and adjust your setting ' | |
'according to the profiler summary.\n' | |
'During normal training(epoch > 1), ' | |
'you may disable the ProfilerHook.') | |
self.profile_times = profile_times | |
assert isinstance(activity_with_cpu, bool), \ | |
'``activity_with_cpu`` should be a boolean.' | |
assert isinstance(activity_with_cuda, bool), \ | |
'``activity_with_cuda`` should be a boolean.' | |
self.activities = [] | |
if activity_with_cpu: | |
self.activities.append(profiler.ProfilerActivity.CPU) | |
if activity_with_cuda: | |
self.activities.append(profiler.ProfilerActivity.CUDA) | |
if schedule is not None: | |
assert isinstance(schedule, dict), '``schedule`` should be a dict.' | |
self.schedule = profiler.schedule(**schedule) | |
else: | |
self.schedule = None | |
self.on_trace_ready = on_trace_ready | |
self.record_shapes = record_shapes | |
self.profile_memory = profile_memory | |
self.with_stack = with_stack | |
self.with_flops = with_flops | |
self.json_trace_path = json_trace_path | |
self._closed = False | |
def before_run(self, runner): | |
"""Initialize the profiler. | |
Through the runner parameter, the validity of the parameter is further | |
determined. | |
""" | |
max_times = runner.max_epochs if self.by_epoch else runner.max_iters | |
if max_times < self.profile_times: | |
raise ValueError( | |
f'``profile_times`` should not be greater than {max_times}') | |
on_trace_ready = self._parse_trace_config(runner) | |
self.profiler = torch.profiler.profile( # noqa | |
activities=self.activities, | |
schedule=self.schedule, | |
on_trace_ready=on_trace_ready, | |
record_shapes=self.record_shapes, | |
profile_memory=self.profile_memory, | |
with_stack=self.with_stack, | |
with_flops=self.with_flops) | |
self.profiler.__enter__() | |
runner.logger.info('profiler is profiling...') | |
def _parse_trace_config(self, runner): | |
"""Used to parse the parameter 'on_trace_ready'.""" | |
if self.on_trace_ready is None: | |
_on_trace_ready = None | |
elif callable(self.on_trace_ready): | |
_on_trace_ready = self.on_trace_ready | |
elif isinstance(self.on_trace_ready, dict): | |
trace_cfg = self.on_trace_ready.copy() | |
trace_type = trace_cfg.pop('type') | |
# Build a log printing handle | |
if trace_type == 'log_trace': | |
def _log_handler(_profile): | |
print(_profile.key_averages().table(**trace_cfg)) | |
_on_trace_ready = _log_handler | |
elif trace_type == 'tb_trace': # tensorboard_trace handler | |
try: | |
import torch_tb_profiler # noqa: F401 | |
except ImportError: | |
raise ImportError( | |
'please run ``pip install torch-tb-profiler``') | |
if 'dir_name' not in trace_cfg: | |
trace_cfg['dir_name'] = osp.join(runner.log_dir, | |
'tf_tracing_logs') | |
elif not osp.isabs(trace_cfg['dir_name']): | |
trace_cfg['dir_name'] = osp.join(runner.log_dir, | |
trace_cfg['dir_name']) | |
runner.logger.info('trace_files of ProfilerHook will be ' | |
f'saved to {trace_cfg["dir_name"]}.') | |
if self.json_trace_path is not None: | |
runner.logger.warn( | |
'When using tensorboard_trace, it is recommended to ' | |
'save json files by setting ``worker_name`` instead of' | |
' setting ``json_trace_path``') | |
_on_trace_ready = torch.profiler.tensorboard_trace_handler( | |
**trace_cfg) | |
else: | |
raise ValueError('trace_type should be "log_trace" or ' | |
f'"tb_trace", but got {trace_type}') | |
else: | |
raise ValueError( | |
'``on_trace_ready`` should be a handler, or dict, or None, ' | |
f'but got {self.on_trace_ready}') | |
return _on_trace_ready | |
def after_train_epoch(self, runner): | |
"""Determine if the content is exported.""" | |
# `after_train_epoch` will also be called in IterBasedTrainLoop. | |
# Here we check `self._closed` to avoid exiting twice. | |
if not self._closed: | |
self._export_chrome_trace(runner) | |
def after_train_iter(self, runner, batch_idx, data_batch, outputs): | |
"""profiler will call `step` method if it is not closed.""" | |
if not self._closed: | |
self.profiler.step() | |
if runner.iter == self.profile_times - 1 and not self.by_epoch: | |
self._export_chrome_trace(runner) | |
def _export_chrome_trace(self, runner): | |
"""Exporting content.""" | |
self._closed = True | |
runner.logger.info('profiler may take a few minutes...') | |
self.profiler.__exit__(None, None, None) | |
if self.json_trace_path is not None: | |
self.profiler.export_chrome_trace(self.json_trace_path) | |
class NPUProfilerHook(Hook): | |
"""NPUProfiler to analyze performance during training. | |
NPU Profiling is used to count the device execution time of all operators. | |
The torch_npu.npu.profile interface is used to complete the profiling data | |
collection at each stage of the project, and the data is analyzed by the | |
msprof tool and the data can be dumped to further manually analyze the | |
key performance bottlenecks. For more details on the torch_npu.npu.profile | |
interface, please visit | |
https://gitee.com/ascend/pytorch/blob/master/torch_npu/npu/profiler.py#profile | |
Args: | |
begin (int): Number of start iterations for profiling. Defaults to 0. | |
end (int): Number of end iterations for profiling. Defaults to 1. | |
result_path (str): The path to save the profiling results file. | |
Defaults to 'cann_profiling'. | |
exit_after_profiling (bool): Whether to exit the program after | |
profiling. Defaults to True. | |
use_e2e_profiler (bool): Turn on E2E profiling, E2E profiling combines | |
performance data at the Pytorch level and the NPU level to analyze | |
the bottlenecks of model performance end-to-end, and cannot show | |
detailed content, and only as an auxiliary analysis. | |
Defaults to False. | |
ge_profiling_to_std_out (bool): Turn on GE profiling, GE uses to | |
collect the profiling data of the host side scheduling of the | |
Assend device. Defaults to False. | |
Examples: | |
>>> cfg = ... | |
>>> profiler_config = dict(type='NPUProfilerHook', end=2) | |
>>> cfg.merge_from_dict({'custom_hooks': custom_hooks}) | |
>>> runner = Runner.from_cfg(cfg) | |
>>> runner.train() | |
""" | |
priority = 'VERY_LOW' | |
def __init__(self, | |
*, | |
begin: int = 0, | |
end: int = 1, | |
result_path: str = 'cann_profiling', | |
exit_after_profiling: bool = True, | |
use_e2e_profiler: bool = False, | |
ge_profiling_to_std_out: bool = False): | |
try: | |
import torch_npu | |
except ImportError: | |
raise ImportError('Failed to import torch_npu module') | |
if begin >= end: | |
raise ValueError( | |
'The iteration to start profiling should not be greater' | |
'than or equal to profile end') | |
self.begin = begin | |
self.end = end | |
self.result_path = result_path | |
self.exit_after_profiling = exit_after_profiling | |
if ge_profiling_to_std_out: | |
os.environ['GE_PROFILING_TO_STD_OUT'] = '1' | |
if not osp.exists(self.result_path): | |
os.makedirs(self.result_path, exist_ok=True) | |
self.profiler = torch_npu.npu.profile( | |
self.result_path, use_e2e_profiler=use_e2e_profiler) | |
def before_run(self, runner): | |
if self.end > runner.max_iters: | |
raise ValueError( | |
'The profiling end iteration should not be greater' | |
'than the max iteration') | |
def before_train_iter(self, runner, batch_idx, data_batch=None): | |
if runner.iter == self.begin: | |
self.profiler.__enter__() | |
runner.logger.info('NPUProfiler starts profiling...') | |
def after_train_iter(self, | |
runner, | |
batch_idx, | |
data_batch=None, | |
outputs=None): | |
if runner.iter == self.end - 1: | |
runner.logger.info('profiler may take a few minutes to' | |
' save the profiling result.') | |
self.profiler.__exit__(None, None, None) | |
if self.exit_after_profiling: | |
sys.exit() | |