File size: 29,533 Bytes
6a62ffb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 |
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import io
import logging
import os
import pickle
import random
import socket
import struct
import subprocess
import warnings
from argparse import Namespace
from collections import OrderedDict
from dataclasses import dataclass
from typing import Any, Dict, List, Mapping, Optional
import torch
import torch.distributed as dist
from fairseq.dataclass.configs import DistributedTrainingConfig, FairseqConfig
from omegaconf import open_dict
try:
import torch_xla.core.xla_model as xm
except ImportError:
xm = None
# Flag to indicate if we're using Megatron
# NOTE: this is a temporary hack until we move away from Megatron's model parallel init
_USE_MEGATRON = False
# Whether to use XLA ops (e.g., on TPUs) instead of CUDA ops.
_USE_XLA = False
logger = logging.getLogger(__name__)
def is_master(cfg: DistributedTrainingConfig):
return cfg.distributed_rank == 0
def infer_init_method(cfg: DistributedTrainingConfig, force_distributed=False):
if cfg.distributed_init_method is not None or cfg.tpu:
return
num_pipelines_per_node = None
if cfg.pipeline_model_parallel:
num_pipeline_devices, num_pipelines_per_node = _pipeline_parallel_pre_init(cfg)
if all(
key in os.environ
for key in ["MASTER_ADDR", "MASTER_PORT", "WORLD_SIZE", "RANK"]
):
# support torch.distributed.launch
_infer_torch_distributed_launch_init(cfg)
elif cfg.distributed_port > 0:
# we can determine the init method automatically for Slurm
_infer_slurm_init(cfg, num_pipelines_per_node)
elif cfg.distributed_world_size > 1 or force_distributed:
# fallback for single node with multiple GPUs
_infer_single_node_init(cfg)
if cfg.pipeline_model_parallel:
_pipeline_parallel_post_init(cfg, num_pipeline_devices, num_pipelines_per_node)
elif not cfg.distributed_no_spawn:
with open_dict(cfg):
cfg.distributed_num_procs = min(
torch.cuda.device_count(), cfg.distributed_world_size
)
def _infer_torch_distributed_launch_init(cfg: DistributedTrainingConfig):
cfg.distributed_init_method = "env://"
cfg.distributed_world_size = int(os.environ["WORLD_SIZE"])
cfg.distributed_rank = int(os.environ["RANK"])
# processes are created by torch.distributed.launch
cfg.distributed_no_spawn = True
def _infer_slurm_init(cfg: DistributedTrainingConfig, num_pipelines_per_node):
node_list = os.environ.get("SLURM_STEP_NODELIST")
if node_list is None:
node_list = os.environ.get("SLURM_JOB_NODELIST")
if node_list is not None:
try:
hostnames = subprocess.check_output(
["scontrol", "show", "hostnames", node_list]
)
cfg.distributed_init_method = "tcp://{host}:{port}".format(
host=hostnames.split()[0].decode("utf-8"),
port=cfg.distributed_port,
)
nnodes = int(os.environ.get("SLURM_NNODES"))
ntasks_per_node = os.environ.get("SLURM_NTASKS_PER_NODE")
if ntasks_per_node is not None:
ntasks_per_node = int(ntasks_per_node)
else:
ntasks = int(os.environ.get("SLURM_NTASKS"))
nnodes = int(os.environ.get("SLURM_NNODES"))
assert ntasks % nnodes == 0
ntasks_per_node = int(ntasks / nnodes)
if ntasks_per_node == 1:
gpus_per_node = torch.cuda.device_count()
node_id = int(os.environ.get("SLURM_NODEID"))
cfg.distributed_rank = node_id * gpus_per_node
cfg.distributed_world_size = nnodes * gpus_per_node
elif cfg.pipeline_model_parallel:
assert ntasks_per_node == num_pipelines_per_node, (
"SLURM --ntasks-per-node must match number of pipelines per "
"node (={})".format(num_pipelines_per_node)
)
cfg.distributed_no_spawn = True
# For 4-way MP on nodes with 8 GPUs, ranks will be [0, 1] on
# the first node, [1, 2] on the second node, etc. This
# matches torch.distributed.launch.
node_id = int(os.environ.get("SLURM_NODEID"))
local_id = int(os.environ.get("SLURM_LOCALID"))
cfg.distributed_rank = node_id * num_pipelines_per_node + local_id
# In the above example, device_id will always be in [0, 1],
# which also matches torch.distributed.launch.
cfg.device_id = local_id
# We also want to set distributed_world_size to be the total
# number of pipelines across all nodes.
cfg.distributed_world_size = nnodes * num_pipelines_per_node
else:
assert ntasks_per_node == cfg.distributed_world_size // nnodes
cfg.distributed_no_spawn = True
cfg.distributed_rank = int(os.environ.get("SLURM_PROCID"))
cfg.device_id = int(os.environ.get("SLURM_LOCALID"))
except subprocess.CalledProcessError as e: # scontrol failed
raise e
except FileNotFoundError: # Slurm is not installed
pass
def _infer_single_node_init(cfg: DistributedTrainingConfig):
assert (
cfg.distributed_world_size <= torch.cuda.device_count()
), f"world size is {cfg.distributed_world_size} but have {torch.cuda.device_count()} available devices"
port = random.randint(10000, 20000)
cfg.distributed_init_method = "tcp://localhost:{port}".format(port=port)
def _pipeline_parallel_pre_init(cfg: DistributedTrainingConfig):
from fairseq import utils
balance_exists = (
cfg.pipeline_balance is not None
or cfg.pipeline_encoder_balance is not None
or cfg.pipeline_decoder_balance is not None
)
devices_exist = (
cfg.pipeline_devices is not None
or cfg.pipeline_encoder_devices is not None
or cfg.pipeline_decoder_devices is not None
)
if not balance_exists:
raise ValueError(
"--pipeline-balance is currently required for pipeline model parallelism"
)
if not devices_exist:
raise ValueError(
"--pipeline-devices is currently required for pipeline model parallelism"
)
cfg.pipeline_balance = utils.eval_str_list(cfg.pipeline_balance, type=int)
if cfg.pipeline_devices is not None:
cfg.pipeline_devices = utils.eval_str_list(cfg.pipeline_devices, type=int)
num_pipeline_devices = len(set(cfg.pipeline_devices))
else:
cfg.pipeline_encoder_devices = utils.eval_str_list(
cfg.pipeline_encoder_devices, type=int
)
cfg.pipeline_decoder_devices = utils.eval_str_list(
cfg.pipeline_decoder_devices, type=int
)
num_pipeline_devices = len(
set(cfg.pipeline_encoder_devices + cfg.pipeline_decoder_devices)
)
gpus_per_node = torch.cuda.device_count()
assert (
gpus_per_node >= num_pipeline_devices
and gpus_per_node % num_pipeline_devices == 0
), (
"the number of unique device IDs in --pipeline-devices must evenly divide "
"the number of GPUs per node (multi-node pipelining is not yet supported)"
)
num_pipelines_per_node = gpus_per_node // num_pipeline_devices
return num_pipeline_devices, num_pipelines_per_node
def _pipeline_parallel_post_init(
cfg: DistributedTrainingConfig, num_pipeline_devices, num_pipelines_per_node
):
if not cfg.distributed_no_spawn:
# When distributed_no_spawn is False, we expect distributed_rank and
# distributed_world_size to be based on the total number of GPUs, so
# we need to correct them to be based on the number of pipelines.
assert cfg.distributed_world_size % num_pipeline_devices == 0
cfg.distributed_world_size = cfg.distributed_world_size // num_pipeline_devices
# In the case of 4-way MP on nodes with 8 GPUs, we want
# distributed_rank to be the starting GPU index for each pipeline
# i.e., 0, 2, ...
gpus_per_node = torch.cuda.device_count()
assert cfg.distributed_rank % gpus_per_node == 0
assert cfg.distributed_rank % num_pipeline_devices == 0
with open_dict(cfg):
cfg.distributed_rank = cfg.distributed_rank // num_pipeline_devices
# launch one process per pipeline
cfg.distributed_num_procs = num_pipelines_per_node
# if we have 4-way MP on a node with 8 GPUs, we want device_ids to be 0
# and 4, indicating the starting device IDs for each pipeline
cfg.device_id *= num_pipeline_devices
if cfg.device_id > 0:
# if there's multiple pipelines on a node (e.g., 4-way MP on an 8
# GPU node), we need to adjust pipeline_devices accordingly
logger.debug(
"setting CUDA device={} on rank {}".format(
cfg.device_id, cfg.distributed_rank
)
)
torch.cuda.set_device(cfg.device_id)
with open_dict(cfg):
cfg.pipeline_devices = [cfg.device_id + d for d in cfg.pipeline_devices]
logger.info(
"setting pipeline_devices={} on rank {}".format(
cfg.pipeline_devices, cfg.distributed_rank
)
)
def distributed_init(cfg: FairseqConfig):
if isinstance(cfg, Namespace):
from fairseq.dataclass.utils import convert_namespace_to_omegaconf
cfg = convert_namespace_to_omegaconf(cfg)
if not cfg.common.tpu:
if torch.distributed.is_available() and torch.distributed.is_initialized():
warnings.warn(
"Distributed is already initialized, cannot initialize twice!"
)
else:
logger.info(
"distributed init (rank {}): {}".format(
cfg.distributed_training.distributed_rank,
cfg.distributed_training.distributed_init_method,
)
)
dist.init_process_group(
backend=cfg.distributed_training.distributed_backend,
init_method=cfg.distributed_training.distributed_init_method,
world_size=cfg.distributed_training.distributed_world_size,
rank=cfg.distributed_training.distributed_rank,
)
logger.info(
"initialized host {} as rank {}".format(
socket.gethostname(),
cfg.distributed_training.distributed_rank,
)
)
# perform a dummy all-reduce to initialize the NCCL communicator
if torch.cuda.is_available():
dist.all_reduce(torch.zeros(1).cuda())
cfg.distributed_training.distributed_rank = torch.distributed.get_rank()
else:
assert xm.xrt_world_size() == cfg.distributed_training.distributed_world_size
global _USE_XLA
_USE_XLA = True
cfg.distributed_training.device_id = xm.get_local_ordinal()
cfg.distributed_training.distributed_rank = xm.get_ordinal()
xm.rendezvous("distributed_init") # wait for all workers
if is_master(cfg.distributed_training):
logging.getLogger().setLevel(logging.INFO)
else:
logging.getLogger().setLevel(logging.WARNING)
if cfg.common.model_parallel_size > 1:
try:
from fairseq.model_parallel.megatron.mpu import (
initialize_model_parallel,
model_parallel_cuda_manual_seed,
)
except ImportError:
raise ImportError(
"\n\nPlease install the megatron submodule:"
"\n\n git submodule update --init "
"fairseq/model_parallel/megatron"
)
global _USE_MEGATRON
_USE_MEGATRON = True
initialize_model_parallel(cfg.common.model_parallel_size)
model_parallel_cuda_manual_seed(cfg.common.seed)
model_part_number = get_model_parallel_rank()
cfg.checkpoint.checkpoint_suffix += "-model_part-{0}".format(model_part_number)
if hasattr(cfg, "model") and getattr(cfg.model, "base_layers", 0) > 0:
cfg.checkpoint.checkpoint_suffix = (
f"-rank-{cfg.distributed_training.distributed_rank}"
)
return cfg.distributed_training.distributed_rank
def distributed_main(i, main, cfg: FairseqConfig, kwargs):
cfg.distributed_training.device_id = i
if torch.cuda.is_available() and not cfg.common.cpu and not cfg.common.tpu:
torch.cuda.set_device(cfg.distributed_training.device_id)
if cfg.distributed_training.distributed_rank is None: # torch.multiprocessing.spawn
cfg.distributed_training.distributed_rank = kwargs.pop("start_rank", 0) + i
cfg.distributed_training.distributed_rank = distributed_init(cfg)
after_distributed_init_fn = kwargs.pop("after_distributed_init_fn", None)
if after_distributed_init_fn:
cfg = after_distributed_init_fn(cfg)
main(cfg, **kwargs)
if torch.distributed.is_initialized():
torch.distributed.barrier(get_global_group())
def call_main(cfg: FairseqConfig, main, **kwargs):
if cfg.distributed_training.distributed_init_method is None:
infer_init_method(cfg.distributed_training)
if cfg.distributed_training.distributed_init_method is not None:
# distributed training
if not cfg.distributed_training.distributed_no_spawn:
start_rank = cfg.distributed_training.distributed_rank
cfg.distributed_training.distributed_rank = None # assign automatically
kwargs["start_rank"] = start_rank
torch.multiprocessing.spawn(
fn=distributed_main,
args=(main, cfg, kwargs),
nprocs=min(
torch.cuda.device_count(),
cfg.distributed_training.distributed_world_size,
),
join=True,
)
else:
distributed_main(cfg.distributed_training.device_id, main, cfg, kwargs)
elif cfg.common.tpu and cfg.distributed_training.distributed_world_size > 1:
import torch_xla.distributed.xla_multiprocessing as xmp
torch.multiprocessing.set_sharing_strategy("file_system")
xmp.spawn(
fn=distributed_main,
args=(main, cfg, kwargs),
# tpu-comment:
# 8 devices in one TPU VM, is the max processes to be spawned.
# The rest is driven by xm.distributed.xla_dist
nprocs=min(cfg.distributed_training.distributed_world_size, 8),
)
else:
# single GPU main
main(cfg, **kwargs)
def use_xla():
global _USE_XLA
return _USE_XLA
def new_groups(grouped_ranks: List[List[int]]):
if use_xla():
return ("tpu", grouped_ranks)
else:
groups = [dist.new_group(g) for g in grouped_ranks]
my_group_idx = _find_my_group_index(grouped_ranks)
return groups[my_group_idx]
def _find_my_group_index(grouped_ranks):
my_rank = get_global_rank()
for i, group in enumerate(grouped_ranks):
if my_rank in group:
return i
raise RuntimeError
def _find_my_group(grouped_ranks):
index = _find_my_group_index(grouped_ranks)
return grouped_ranks[index]
def get_rank(group):
if use_xla():
assert group[0] == "tpu"
my_group = _find_my_group(group[1])
return my_group.index(get_global_rank())
else:
return dist.get_rank(group=group)
def get_world_size(group):
if use_xla():
assert group[0] == "tpu"
my_group = _find_my_group(group[1])
return len(my_group)
elif torch.distributed.is_initialized():
return dist.get_world_size(group=group)
else:
return 1
def get_global_group():
if use_xla():
return new_groups([list(range(get_global_world_size()))])
elif torch.distributed.is_initialized():
if not hasattr(get_global_group, "_global_group"):
# ideally we could use torch.distributed.group.WORLD, but it seems
# to cause random NCCL hangs in some cases
get_global_group._global_group = dist.new_group()
return get_global_group._global_group
else:
return None
def get_global_rank():
if use_xla():
return xm.get_ordinal()
elif torch.distributed.is_initialized():
return torch.distributed.get_rank()
else:
return 0
def get_global_world_size():
if use_xla():
return xm.xrt_world_size()
elif torch.distributed.is_initialized():
return torch.distributed.get_world_size()
else:
return 1
def get_data_parallel_group():
"""Get the data parallel group the caller rank belongs to."""
global _USE_MEGATRON
if _USE_MEGATRON:
from fairseq.model_parallel.megatron import mpu
return mpu.get_data_parallel_group()
else:
return get_global_group()
def get_data_parallel_rank():
"""Return my rank for the data parallel group."""
return get_rank(get_data_parallel_group())
def get_data_parallel_world_size():
"""Return world size for the data parallel group."""
return get_world_size(get_data_parallel_group())
def get_model_parallel_group():
global _USE_MEGATRON
if _USE_MEGATRON:
from fairseq.model_parallel.megatron import mpu
return mpu.get_model_parallel_group()
else:
return None
def get_model_parallel_rank():
"""Return my rank for the model parallel group."""
return get_rank(get_model_parallel_group())
def get_model_parallel_world_size():
"""Return world size for the model parallel group."""
return get_world_size(get_model_parallel_group())
def all_reduce(tensor, group, op="sum"):
if use_xla():
assert isinstance(group, tuple) and group[0] == "tpu"
tensor = [tensor] # wrap in a list to make xm.all_reduce in-place
return xm.all_reduce(op, tensor, groups=group[1])[0]
else:
if op == "sum":
op = dist.ReduceOp.SUM
elif op == "max":
op = dist.ReduceOp.MAX
else:
raise NotImplementedError
dist.all_reduce(tensor, op=op, group=group)
return tensor
def broadcast(tensor, src, group):
if use_xla():
# XLA doesn't support broadcast, hack it with all_reduce
if get_rank(group) != src:
tensor.zero_()
all_reduce(tensor, group)
else:
dist.broadcast(tensor, src=src, group=group)
def all_to_all(tensor, group):
"""Perform an all-to-all operation on a 1D Tensor."""
assert tensor.dim() == 1
split_count = get_world_size(group=group)
assert tensor.numel() % split_count == 0
if use_xla():
assert isinstance(group, tuple) and group[0] == "tpu"
return xm.all_to_all(
tensor,
split_dimension=0,
concat_dimension=0,
split_count=split_count,
groups=group[1],
)
else:
output = torch.zeros_like(tensor)
dist.all_to_all_single(output, tensor, group=group)
return output
def all_gather(tensor, group, return_tensor=False):
"""Perform an all-gather operation."""
if use_xla():
result = xm.all_gather(tensor, groups=group[1])
world_size = get_world_size(group=group)
result = result.view(world_size, *tensor.size())
if return_tensor:
return result
else:
return [result[i] for i in range(world_size)]
else:
world_size = get_world_size(group=group)
rank = get_rank(group=group)
tensor_list = [
tensor if i == rank else torch.empty_like(tensor) for i in range(world_size)
]
dist.all_gather(tensor_list, tensor, group=group)
if return_tensor:
return torch.stack(tensor_list, dim=0)
else:
return tensor_list
def all_gather_list(data, group=None, max_size=16384):
"""Gathers arbitrary data from all nodes into a list.
Similar to :func:`~torch.distributed.all_gather` but for arbitrary Python
data. Note that *data* must be picklable and any CUDA tensors will be moved
to CPU and returned on CPU as well.
Args:
data (Any): data from the local worker to be gathered on other workers
group: group of the collective
max_size (int, optional): maximum size of the data to be gathered
across workers
"""
from fairseq import utils
if group is None:
group = get_global_group()
rank = get_rank(group=group)
world_size = get_world_size(group=group)
buffer_size = max_size * world_size
if (
not hasattr(all_gather_list, "_buffer")
or all_gather_list._buffer.numel() < buffer_size
):
all_gather_list._buffer = torch.cuda.ByteTensor(buffer_size)
all_gather_list._cpu_buffer = torch.ByteTensor(max_size).pin_memory()
buffer = all_gather_list._buffer
buffer.zero_()
cpu_buffer = all_gather_list._cpu_buffer
data = utils.move_to_cpu(data)
enc = pickle.dumps(data)
enc_size = len(enc)
header_size = 4 # size of header that contains the length of the encoded data
size = header_size + enc_size
if size > max_size:
raise ValueError(
"encoded data size ({}) exceeds max_size ({})".format(size, max_size)
)
header = struct.pack(">I", enc_size)
cpu_buffer[:size] = torch.ByteTensor(list(header + enc))
start = rank * max_size
buffer[start : start + size].copy_(cpu_buffer[:size])
all_reduce(buffer, group=group)
buffer = buffer.cpu()
try:
result = []
for i in range(world_size):
out_buffer = buffer[i * max_size : (i + 1) * max_size]
(enc_size,) = struct.unpack(">I", bytes(out_buffer[:header_size].tolist()))
if enc_size > 0:
result.append(
pickle.loads(
bytes(out_buffer[header_size : header_size + enc_size].tolist())
)
)
return result
except pickle.UnpicklingError:
raise Exception(
"Unable to unpickle data from other workers. all_gather_list requires all "
"workers to enter the function together, so this error usually indicates "
"that the workers have fallen out of sync somehow. Workers can fall out of "
"sync if one of them runs out of memory, or if there are other conditions "
"in your training script that can cause one worker to finish an epoch "
"while other workers are still iterating over their portions of the data. "
"Try rerunning with --ddp-backend=legacy_ddp and see if that helps."
)
def all_reduce_dict(data: Mapping[str, Any], device, group) -> Dict[str, Any]:
"""
AllReduce a dictionary of values across workers. We separately
reduce items that are already on the device and items on CPU for
better performance.
Args:
data (Mapping[str, Any]): dictionary of data to all-reduce, but
cannot be a nested dictionary
device (torch.device): device for the reduction
group: group of the collective
"""
data_keys = list(data.keys())
# We want to separately reduce items that are already on the
# device and items on CPU for performance reasons.
cpu_data = OrderedDict()
device_data = OrderedDict()
for k in data_keys:
t = data[k]
if not torch.is_tensor(t):
cpu_data[k] = torch.tensor(t, dtype=torch.double)
elif t.device.type != device.type:
cpu_data[k] = t.to(dtype=torch.double)
else:
device_data[k] = t.to(dtype=torch.double)
def _all_reduce_dict(data: OrderedDict):
if len(data) == 0:
return data
buf = torch.cat([t.view(-1) for t in data.values()]).to(device=device)
all_reduce(buf, group=group)
split_buf = torch.split(buf.clone(), [t.numel() for t in data.values()])
reduced_data = [t.view_as(orig) for t, orig in zip(split_buf, data.values())]
return OrderedDict(zip(data.keys(), reduced_data))
cpu_data = _all_reduce_dict(cpu_data)
device_data = _all_reduce_dict(device_data)
def get_from_stack(key):
if key in cpu_data:
return cpu_data[key]
elif key in device_data:
return device_data[key]
raise KeyError
return OrderedDict([(key, get_from_stack(key)) for key in data_keys])
def broadcast_tensors(
tensors: Optional[List[torch.Tensor]],
src_rank: int,
group: object,
dist_device: Optional[torch.device] = None,
) -> List[torch.Tensor]:
"""
Broadcasts a list of tensors without other (non-src) ranks needing to know
the dtypes/shapes of the tensors.
"""
if dist_device is None:
if torch.distributed.get_backend(group) == "nccl":
dist_device = torch.device("cuda")
else:
dist_device = torch.device("cpu")
# share metadata first to simplify transfer
is_src_rank = get_rank(group) == src_rank
if is_src_rank:
metadata = [
{"size": t.size(), "dtype": t.dtype, "device": t.device} for t in tensors
]
metadata = _broadcast_object_slow(metadata, src_rank, group, dist_device)
else:
metadata = _broadcast_object_slow(None, src_rank, group, dist_device)
out_tensors = []
for i, meta in enumerate(metadata):
if is_src_rank:
tensor = tensors[i]
broadcast(tensors[i].to(dist_device), src=src_rank, group=group)
else:
tensor = torch.zeros(
[meta["size"].numel()], dtype=meta["dtype"], device=dist_device
)
broadcast(tensor, src=src_rank, group=group)
tensor = tensor.view(meta["size"]).to(meta["device"])
out_tensors.append(tensor)
return out_tensors
def broadcast_object(
obj: Any,
src_rank: int,
group: object,
dist_device: Optional[torch.device] = None,
) -> Any:
"""Broadcast an arbitrary Python object to other workers."""
if dist_device is None:
if torch.distributed.get_backend(group) == "nccl":
dist_device = torch.device("cuda")
else:
dist_device = torch.device("cpu")
if get_rank(group) == src_rank:
# split the tensors from the non-tensors so we can broadcast them
# directly, avoiding unnecessary serialization/deserialization
tensors = []
obj = _split_tensors_from_obj(obj, tensors)
obj = _broadcast_object_slow(obj, src_rank, group, dist_device)
tensors = broadcast_tensors(tensors, src_rank, group, dist_device)
else:
obj = _broadcast_object_slow(None, src_rank, group, dist_device)
tensors = broadcast_tensors(None, src_rank, group, dist_device)
return _put_tensors_in_obj(obj, tensors)
def _broadcast_object_slow(
obj: Any,
src_rank: int,
group: object,
dist_device: torch.device,
) -> Any:
if get_rank(group) == src_rank:
# Emit data
buffer = io.BytesIO()
torch.save(obj, buffer)
buffer = torch.ByteTensor(buffer.getbuffer()).to(dist_device)
length = torch.LongTensor([len(buffer)]).to(dist_device)
broadcast(length, src=src_rank, group=group)
broadcast(buffer, src=src_rank, group=group)
else:
# Fetch from the source
length = torch.LongTensor([0]).to(dist_device)
broadcast(length, src=src_rank, group=group)
buffer = torch.ByteTensor(int(length.item())).to(dist_device)
broadcast(buffer, src=src_rank, group=group)
buffer = io.BytesIO(buffer.cpu().numpy())
obj = torch.load(buffer, map_location="cpu")
return obj
@dataclass(frozen=True)
class _TensorPlaceholder:
index: int
def _split_tensors_from_obj(obj: Any, tensors: List[torch.Tensor]) -> Any:
if torch.is_tensor(obj):
placeholder = _TensorPlaceholder(index=len(tensors))
tensors.append(obj)
return placeholder
elif isinstance(obj, dict):
return {k: _split_tensors_from_obj(v, tensors) for k, v in obj.items()}
elif isinstance(obj, list):
return [_split_tensors_from_obj(v, tensors) for v in obj]
elif isinstance(obj, tuple):
return tuple(_split_tensors_from_obj(v, tensors) for v in obj)
elif isinstance(obj, set):
return {_split_tensors_from_obj(v, tensors) for v in obj}
else:
return obj
def _put_tensors_in_obj(obj: Any, tensors: List[torch.Tensor]) -> Any:
if isinstance(obj, _TensorPlaceholder):
return tensors[obj.index]
elif isinstance(obj, dict):
return {k: _put_tensors_in_obj(v, tensors) for k, v in obj.items()}
elif isinstance(obj, list):
return [_put_tensors_in_obj(v, tensors) for v in obj]
elif isinstance(obj, tuple):
return tuple(_put_tensors_in_obj(v, tensors) for v in obj)
elif isinstance(obj, set):
return {_put_tensors_in_obj(v, tensors) for v in obj}
else:
return obj
|