Spaces:
Runtime error
Runtime error
| # Copyright (c) Aishwarya Kamath & Nicolas Carion. Licensed under the Apache License 2.0. All Rights Reserved | |
| # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved | |
| """ | |
| Utilities related to distributed mode. | |
| By default, the reduce of metrics and such are done on GPU, since it's more straightforward (we reuse the NCCL backend) | |
| If you want to reduce on CPU instead (required for big datasets like GQA), use the env variable MDETR_CPU_REDUCE=1 | |
| """ | |
| import functools | |
| import io | |
| import os | |
| import datetime | |
| import torch | |
| import torch.distributed as dist | |
| _LOCAL_PROCESS_GROUP = None | |
| def _get_global_gloo_group(): | |
| """ | |
| Return a process group based on gloo backend, containing all the ranks | |
| The result is cached. | |
| """ | |
| if dist.get_backend() == "nccl": | |
| return dist.new_group(backend="gloo") | |
| return dist.group.WORLD | |
| def all_gather(data): | |
| """ | |
| Run all_gather on arbitrary picklable data (not necessarily tensors) | |
| Args: | |
| data: any picklable object | |
| Returns: | |
| list[data]: list of data gathered from each rank | |
| """ | |
| world_size = get_world_size() | |
| if world_size == 1: | |
| return [data] | |
| cpu_group = None | |
| if os.getenv("MDETR_CPU_REDUCE") == "1": | |
| cpu_group = _get_global_gloo_group() | |
| buffer = io.BytesIO() | |
| torch.save(data, buffer) | |
| data_view = buffer.getbuffer() | |
| device = "cuda" if cpu_group is None else "cpu" | |
| tensor = torch.ByteTensor(data_view).to(device) | |
| # obtain Tensor size of each rank | |
| local_size = torch.tensor([tensor.numel()], device=device, dtype=torch.long) | |
| size_list = [torch.tensor([0], device=device, dtype=torch.long) for _ in range(world_size)] | |
| if cpu_group is None: | |
| dist.all_gather(size_list, local_size) | |
| else: | |
| print("gathering on cpu") | |
| dist.all_gather(size_list, local_size, group=cpu_group) | |
| size_list = [int(size.item()) for size in size_list] | |
| max_size = max(size_list) | |
| assert isinstance(local_size.item(), int) | |
| local_size = int(local_size.item()) | |
| # receiving Tensor from all ranks | |
| # we pad the tensor because torch all_gather does not support | |
| # gathering tensors of different shapes | |
| tensor_list = [] | |
| for _ in size_list: | |
| tensor_list.append(torch.empty((max_size,), dtype=torch.uint8, device=device)) | |
| if local_size != max_size: | |
| padding = torch.empty(size=(max_size - local_size,), dtype=torch.uint8, device=device) | |
| tensor = torch.cat((tensor, padding), dim=0) | |
| if cpu_group is None: | |
| dist.all_gather(tensor_list, tensor) | |
| else: | |
| dist.all_gather(tensor_list, tensor, group=cpu_group) | |
| data_list = [] | |
| for size, tensor in zip(size_list, tensor_list): | |
| tensor = torch.split(tensor, [size, max_size - size], dim=0)[0] | |
| buffer = io.BytesIO(tensor.cpu().numpy()) | |
| obj = torch.load(buffer) | |
| data_list.append(obj) | |
| return data_list | |
| def reduce_dict(input_dict, average=True): | |
| """ | |
| Args: | |
| input_dict (dict): all the values will be reduced | |
| average (bool): whether to do average or sum | |
| Reduce the values in the dictionary from all processes so that all processes | |
| have the averaged results. Returns a dict with the same fields as | |
| input_dict, after reduction. | |
| """ | |
| world_size = get_world_size() | |
| if world_size < 2: | |
| return input_dict | |
| with torch.no_grad(): | |
| names = [] | |
| values = [] | |
| # sort the keys so that they are consistent across processes | |
| for k in sorted(input_dict.keys()): | |
| names.append(k) | |
| values.append(input_dict[k]) | |
| values = torch.stack(values, dim=0) | |
| dist.all_reduce(values) | |
| if average: | |
| values /= world_size | |
| reduced_dict = {k: v for k, v in zip(names, values)} | |
| return reduced_dict | |
| def setup_for_distributed(is_master): | |
| """ | |
| This function disables printing when not in master process | |
| """ | |
| import builtins as __builtin__ | |
| builtin_print = __builtin__.print | |
| def print(*args, **kwargs): | |
| force = kwargs.pop("force", False) | |
| if is_master or force: | |
| builtin_print(*args, **kwargs) | |
| __builtin__.print = print | |
| def is_dist_avail_and_initialized(): | |
| """ | |
| Returns: | |
| True if distributed training is enabled | |
| """ | |
| if not dist.is_available(): | |
| return False | |
| if not dist.is_initialized(): | |
| return False | |
| return True | |
| def get_world_size(): | |
| """ | |
| Returns: | |
| The number of processes in the process group | |
| """ | |
| if not is_dist_avail_and_initialized(): | |
| return 1 | |
| return dist.get_world_size() | |
| def get_rank(): | |
| """ | |
| Returns: | |
| The rank of the current process within the global process group. | |
| """ | |
| if not is_dist_avail_and_initialized(): | |
| return 0 | |
| return dist.get_rank() | |
| def get_local_rank() -> int: | |
| """ | |
| Returns: | |
| The rank of the current process within the local (per-machine) process group. | |
| """ | |
| if not dist.is_available(): | |
| return 0 | |
| if not dist.is_initialized(): | |
| return 0 | |
| assert _LOCAL_PROCESS_GROUP is not None | |
| return dist.get_rank(group=_LOCAL_PROCESS_GROUP) | |
| def get_local_size() -> int: | |
| """ | |
| Returns: | |
| The size of the per-machine process group, | |
| i.e. the number of processes per machine. | |
| """ | |
| if not dist.is_available(): | |
| return 1 | |
| if not dist.is_initialized(): | |
| return 1 | |
| return dist.get_world_size(group=_LOCAL_PROCESS_GROUP) | |
| def is_main_process(): | |
| """Return true if the current process is the main one""" | |
| return get_rank() == 0 | |
| def save_on_master(*args, **kwargs): | |
| """Utility function to save only from the main process""" | |
| if is_main_process(): | |
| torch.save(*args, **kwargs) | |
| def init_distributed_mode(args): | |
| """Initialize distributed training, if appropriate""" | |
| if "RANK" in os.environ and "WORLD_SIZE" in os.environ: | |
| args.rank = int(os.environ["RANK"]) | |
| args.world_size = int(os.environ["WORLD_SIZE"]) | |
| args.gpu = int(os.environ["LOCAL_RANK"]) | |
| elif "SLURM_PROCID" in os.environ: | |
| args.rank = int(os.environ["SLURM_PROCID"]) | |
| args.gpu = args.rank % torch.cuda.device_count() | |
| else: | |
| print("Not using distributed mode") | |
| args.distributed = False | |
| return | |
| args.distributed = True | |
| torch.cuda.set_device(args.gpu) | |
| args.dist_backend = "nccl" | |
| print("| distributed init (rank {}): {}".format(args.rank, args.dist_url), flush=True) | |
| dist.init_process_group( | |
| backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank, | |
| timeout=datetime.timedelta(0, 7200) | |
| ) | |
| dist.barrier() | |
| setup_for_distributed(args.debug or args.rank == 0) | |