Source code for paddlenlp.trainer.utils.helper

# Copyright 2020-present the HuggingFace Inc. team.
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

# This file is modified from

import collections
import copy
import os
from typing import Any, Optional

import numpy as np
import paddle
import paddle.distributed as dist
from paddle.distributed import fleet

from paddlenlp.utils.log import logger

__all__ = [

def distributed_concat(tensor: Any, num_total_examples: Optional[int] = None) -> Any:
        if isinstance(tensor, (tuple, list)):
            return type(tensor)(distributed_concat(t, num_total_examples) for t in tensor)
        output_tensors = []
        dist.all_gather(output_tensors, tensor)
        output_tensors = [t if len(t.shape) > 0 else t.reshape_([-1]) for t in output_tensors]
        concat = paddle.concat(output_tensors, axis=0)

        # truncate the dummy elements added by SequentialDistributedSampler
        if num_total_examples is not None:
            concat = concat[:num_total_examples]
        return concat
    except AssertionError:
        raise AssertionError("Not currently using distributed training")

[docs]def paddle_pad_and_concatenate(tensor1, tensor2, padding_index=-100): """Concatenates `tensor1` and `tensor2` on first axis, applying padding on the second if necessary.""" if len(tensor1.shape) == 1 or tensor1.shape[1] == tensor2.shape[1]: return paddle.concat((tensor1, tensor2), axis=0) # raise ValueError("Error") # Let's figure out the new shape new_shape = (tensor1.shape[0] + tensor2.shape[0], max(tensor1.shape[1], tensor2.shape[1])) + tuple( tensor1.shape[2:] ) # Now let's fill the result tensor # result = tensor1.new_full(new_shape, padding_index) result = paddle.full(new_shape, padding_index, dtype=tensor1.dtype) result[: tensor1.shape[0], : tensor1.shape[1]] = tensor1 result[tensor1.shape[0] :, : tensor2.shape[1]] = tensor2 return result
def numpy_pad_and_concatenate(array1, array2, padding_index=-100): """Concatenates `array1` and `array2` on first axis, applying padding on the second if necessary.""" if len(array1.shape) == 1 or array1.shape[1] == array2.shape[1]: return np.concatenate((array1, array2), axis=0) # Let's figure out the new shape new_shape = (array1.shape[0] + array2.shape[0], max(array1.shape[1], array2.shape[1])) + array1.shape[2:] # Now let's fill the result tensor result = np.full_like(array1, padding_index, shape=new_shape) result[: array1.shape[0], : array1.shape[1]] = array1 result[array1.shape[0] :, : array2.shape[1]] = array2 return result
[docs]def nested_concat(tensors, new_tensors, padding_index=-100): """ Concat the `new_tensors` to `tensors` on the first dim and pad them on the second if needed. Works for tensors or nested list/tuples of tensors. """ assert type(tensors) == type( new_tensors ), f"Expected `tensors` and `new_tensors` to have the same type but found {type(tensors)} and {type(new_tensors)}." if isinstance(tensors, (list, tuple)): return type(tensors)(nested_concat(t, n, padding_index=padding_index) for t, n in zip(tensors, new_tensors)) elif isinstance(tensors, paddle.Tensor): return paddle_pad_and_concatenate(tensors, new_tensors, padding_index=padding_index) elif isinstance(tensors, np.ndarray): return numpy_pad_and_concatenate(tensors, new_tensors, padding_index=padding_index) else: raise TypeError(f"Unsupported type for concatenation: got {type(tensors)}")
[docs]def nested_detach(tensors): "Detach `tensors` (even if it's a nested list/tuple of tensors)." if isinstance(tensors, (list, tuple)): return type(tensors)(nested_detach(t) for t in tensors) return tensors.detach()
[docs]def nested_numpify(tensors): "Numpify `tensors` (even if it's a nested list/tuple of tensors)." if isinstance(tensors, (list, tuple)): return type(tensors)(nested_numpify(t) for t in tensors) t = tensors.cpu() if t.dtype == paddle.float16: t = t.cast(paddle.float32) return t.cpu().numpy()
[docs]def nested_truncate(tensors, limit): "Truncate `tensors` at `limit` (even if it's a nested list/tuple of tensors)." if isinstance(tensors, (list, tuple)): return type(tensors)(nested_truncate(t, limit) for t in tensors) return tensors[:limit]
def distributed_isfile(filename): """Check all machine nodes. return False if no machine have such file.""" trainers_num = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) if trainers_num <= 1: return os.path.isfile(filename) else: local_rank = int(os.getenv("PADDLE_RANK_IN_NODE", 0)) file_count = paddle.zeros([1], dtype="int64") if local_rank == 0 and os.path.isfile(filename): file_count += 1 paddle.distributed.all_reduce(file_count) return file_count >= 1 def distributed_file(filename): trainers_num = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) if trainers_num <= 1: return filename else: local_rank = int(os.getenv("PADDLE_RANK_IN_NODE", 0)) found_file = paddle.to_tensor([2**20], dtype="int64") if local_rank == 0 and os.path.isfile(filename): found_file = paddle.to_tensor([paddle.distributed.get_rank()], dtype="int64") tensor_list = [] paddle.distributed.all_gather(tensor_list, found_file) src = paddle.min(paddle.concat(tensor_list)).item() file_object_list = [None] if paddle.distributed.get_rank() == src: file_object_list = [open(filename, "rb").read()] paddle.distributed.broadcast_object_list(file_object_list, src=src) file_object = file_object_list[0] if local_rank == 0 and not os.path.isfile(filename): if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) with open(filename, "wb") as f: f.write(file_object) paddle.distributed.barrier() return filename TensorHolder = collections.namedtuple("TensorHolder", ["shape", "dtype", "name"]) def nested_reduce_tensor(tensor): if isinstance(tensor, dict): # copy tensor since it will be inplace modified dict tensor = copy.copy(tensor) for key in list(tensor.keys()): tensor[key] = nested_reduce_tensor(tensor[key]) if isinstance(tensor, (tuple, list)): return type(tensor)(nested_reduce_tensor(t) for t in tensor) if isinstance(tensor, paddle.Tensor): return TensorHolder(tensor.shape, tensor.dtype, return tensor def nested_empty_tensor(tensor): if isinstance(tensor, dict): for key in list(tensor.keys()): tensor[key] = nested_empty_tensor(tensor[key]) if isinstance(tensor, list): return type(tensor)(nested_empty_tensor(t) for t in tensor) # TensorHolder is tuple if isinstance(tensor, TensorHolder): t = paddle.empty(tensor.shape, dtype=tensor.dtype, = return t return tensor def nested_broadcast_tensor(tensor, src=0, group=None): if isinstance(tensor, dict): for key in list(tensor.keys()): tensor[key] = nested_broadcast_tensor(tensor[key], src=src, group=group) if isinstance(tensor, list): return type(tensor)(nested_broadcast_tensor(t, src=src, group=group) for t in tensor) if isinstance(tensor, paddle.Tensor): paddle.distributed.broadcast(tensor, src=src, group=group, sync_op=True) return tensor def broadcast_dp_optimizer(state_dict): if paddle.distributed.get_world_size() <= 1: return state_dict"Start broadcast optimizer in data parallel group.") try: hcg = fleet.get_hybrid_communicate_group() dp_group = hcg.get_data_parallel_group() src_rank = hcg.get_data_parallel_group_src_rank() process_rank = paddle.distributed.get_rank() # Don't broadcast optimizer for dp rank is 1. if dp_group.nranks <= 1: return state_dict except: dp_group = None src_rank = 0 process_rank = paddle.distributed.get_rank() if process_rank == src_rank: if state_dict is None: logger.warning( f"Your local rank {paddle.distributed.get_rank()} must have a state_dict. dp_rank:{process_rank}, src_rank:{src_rank}" ) fake_state_dict = [nested_reduce_tensor(state_dict)] else: if state_dict is not None: logger.warning( f"Your local rank {paddle.distributed.get_rank()} are forbidden to have a state_dict. dp_rank:{process_rank}, src_rank:{src_rank}" ) fake_state_dict = [None] paddle.distributed.broadcast_object_list( fake_state_dict, src=src_rank, group=dp_group, ) fake_state_dict = fake_state_dict[0] if process_rank != src_rank: state_dict = nested_empty_tensor(fake_state_dict) state_dict = nested_broadcast_tensor(state_dict, src=src_rank, group=dp_group) return state_dict