Source code for paddlenlp.trainer.trainer_utils

# Copyright 2020-present the HuggingFace Inc. team.
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This file is modified from
#  https://github.com/huggingface/transformers/blob/main/src/transformers/trainer_utils.py

"""
Utilities for the Trainer class.
"""
import datetime
import gc
import inspect
import json
import math
import os
import random
import re
import threading
import time
from contextlib import contextmanager
from enum import Enum
from typing import Dict, List, NamedTuple, Optional, Tuple, Union

import numpy as np
import paddle
from paddle.distributed import fleet
from paddle.distributed.fleet.meta_parallel import get_rng_state_tracker
from paddle.io import IterableDataset
from paddle.optimizer.lr import LambdaDecay

from paddlenlp.ops import Topology

from ..trainer.argparser import strtobool
from ..transformers.tokenizer_utils_base import BatchEncoding
from ..utils.import_utils import is_paddle_cuda_available, is_psutil_available
from ..utils.log import logger

__all__ = [
    "TrainOutput",
    "PredictionOutput",
    "EvalPrediction",
    "IntervalStrategy",
    "SchedulerType",
    "set_seed",
    "speed_metrics",
    "get_last_checkpoint",
    "get_scheduler",
    "set_hyrbid_parallel_seed",
]


def _get_distributed_seeds(seed: int = 1234, topo: Topology = None):
    """
    Get the seeds from distributed environment strategy.
    Args:
        seed (:obj:`int`, `optional`, defaults to 1234): The seeds for initializing distributed training.
        topo (:obj:`Topology`, `optional`, defaults to None): The topology of hybrid parallel in semi-auto mode.
    Returns:
        Tuple[int, int]: The global seed and local seed respectively.
    """

    # NOTE: For parameter init seed:
    # seed: dp/mp_undistributed_paramter/sharding is same; others is different
    # For compute seed(dropout):
    # global seed: only mp group is same.
    # local seed: all groups are different
    hcg = None
    if hasattr(fleet.fleet, "_hcg") and topo is None:
        hcg = fleet.get_hybrid_communicate_group()

    if topo is not None and paddle.distributed.get_world_size() > 1:
        dp_rank = topo.dp_info.rank
        dp_size = topo.dp_info.size

        pp_rank = topo.pp_info.rank
        pp_size = topo.pp_info.size

        mp_rank = topo.mp_info.rank
        mp_size = topo.mp_info.size

        sep_rank = topo.sep_info.rank
        sep_size = topo.sep_info.size

        sharding_rank = topo.sharding_info.rank
    elif hcg is not None and paddle.distributed.get_world_size() > 1:
        # obtain rank message of hybrid parallel

        mp_rank = hcg.get_model_parallel_rank()
        mp_size = hcg.get_model_parallel_world_size()

        if hasattr(hcg, "get_sep_parallel_rank"):
            sep_rank = hcg.get_sep_parallel_rank()
            sep_size = hcg.get_sep_parallel_world_size()
        else:
            sep_rank, sep_size = 0, 1

        pp_rank = hcg.get_stage_id()
        pp_size = hcg.get_pipe_parallel_world_size()

        dp_rank = hcg.get_data_parallel_rank()
        dp_size = hcg.get_data_parallel_world_size()

        sharding_rank = hcg.get_sharding_parallel_rank()
    else:
        mp_rank, mp_size = 0, 1
        sep_rank, sep_size = 0, 1
        pp_rank, pp_size = 0, 1
        dp_rank, dp_size = 0, 1
        sharding_rank, _ = 0, 1

    seed_offset = seed
    global_seed = (
        seed_offset
        + sep_rank * (mp_size)
        + pp_rank * (mp_size * sep_size)
        + dp_rank * (mp_size * sep_size * pp_size)
        + sharding_rank * (mp_size * sep_size * pp_size * dp_size)
    )

    seed_offset += paddle.distributed.get_world_size()
    local_seed = (
        seed_offset
        + mp_rank
        + sep_rank * (mp_size)
        + pp_rank * (mp_size * sep_size)
        + dp_rank * (mp_size * sep_size * pp_size)
        + sharding_rank * (mp_size * sep_size * pp_size * dp_size)
    )

    # NOTE: the commented seeds are set only for precision validation
    random_seed = seed + 100 * pp_rank

    return global_seed, local_seed, random_seed


def set_seed(seed: int = 1234, topo=None):
    global_seed, local_seed, random_seed = _get_distributed_seeds(seed, topo)

    tracker = get_rng_state_tracker()
    if "global_seed" not in tracker.states_ and global_seed not in tracker.seeds_:
        tracker.add("global_seed", global_seed)

    if "local_seed" not in tracker.states_ and local_seed not in tracker.seeds_:
        tracker.add("local_seed", local_seed)

    paddle.seed(global_seed)
    random.seed(random_seed)
    np.random.seed(random_seed)

    logger.info(
        "The global seed is set to {}, local seed is set to {} and "
        "random seed is set to {}.".format(global_seed, local_seed, random_seed)
    )


def _switch_mode(mode="dynamic"):
    assert mode in ["dynamic", "static"]
    if mode == "dynamic":
        paddle.disable_static()
    else:
        paddle.enable_static()


@contextmanager
def _exec_mode_guard(mode="dynamic"):
    origin_mode = "dynamic" if paddle.in_dynamic_mode() else "static"
    _switch_mode(mode)
    try:
        yield
    finally:
        _switch_mode(origin_mode)


class ExplicitEnum(Enum):
    """
    Enum with more explicit error message for missing values.
    """

    @classmethod
    def _missing_(cls, value):
        raise ValueError(
            f"{value} is not a valid {cls.__name__}, please select one of {list(cls._value2member_map_.keys())}"
        )


[docs]class EvalPrediction(NamedTuple): """ Evaluation output (always contains labels), to be used to compute metrics. Parameters: predictions (`np.ndarray`): Predictions of the model. label_ids (`np.ndarray`): Targets to be matched. """ predictions: Union[np.ndarray, Tuple[np.ndarray]] label_ids: Union[np.ndarray, Tuple[np.ndarray]]
class EvalLoopOutput(NamedTuple): predictions: Union[np.ndarray, Tuple[np.ndarray]] label_ids: Optional[Union[np.ndarray, Tuple[np.ndarray]]] metrics: Optional[Dict[str, float]] num_samples: Optional[int]
[docs]class PredictionOutput(NamedTuple): predictions: Union[np.ndarray, Tuple[np.ndarray]] label_ids: Optional[Union[np.ndarray, Tuple[np.ndarray]]] metrics: Optional[Dict[str, float]]
[docs]class TrainOutput(NamedTuple): global_step: int training_loss: float metrics: Dict[str, float]
PREFIX_CHECKPOINT_DIR = "checkpoint" _re_checkpoint = re.compile(r"^" + PREFIX_CHECKPOINT_DIR + r"\-(\d+)$") def get_last_checkpoint(folder): content = os.listdir(folder) checkpoints = [ path for path in content if _re_checkpoint.search(path) is not None and os.path.isdir(os.path.join(folder, path)) ] if len(checkpoints) == 0: return if strtobool(os.getenv("FLAG_LLM_PDC", "False")): for i in sorted(checkpoints, key=lambda x: int(_re_checkpoint.search(x).groups()[0]), reverse=True): current_path = os.path.join(folder, i) # make sure the checkpoint is valid if os.path.exists(os.path.join(current_path, ".checkpoint_done")): return current_path return else: return os.path.join(folder, max(checkpoints, key=lambda x: int(_re_checkpoint.search(x).groups()[0])))
[docs]class IntervalStrategy(ExplicitEnum): NO = "no" STEPS = "steps" EPOCH = "epoch"
class EvaluationStrategy(ExplicitEnum): NO = "no" STEPS = "steps" EPOCH = "epoch" class OptimizerNames(ExplicitEnum): """ Stores the acceptable string identifiers for optimizers. """ ADAMW = "adamw" ADAFACTOR = "adafactor" class ShardingOption(ExplicitEnum): """ Sharding Option OP for sharding optimizer state GRAD for sharding gradients FULL_SHARD for sharding optimizer gradient and parameter OFFLOAD means offload to cpu. """ SHARD_OP = "stage1" SHARD_GRAD_OP = "stage2" FULL_SHARD = "stage3" # NO_SHARD = "no" OFFLOAD = "offload" def is_main_process(local_rank): """ Whether or not the current process is the local process, based on `xm.get_ordinal()` (for TPUs) first, then on `local_rank`. """ return local_rank in [-1, 0] def total_processes_number(local_rank): """ Return the number of processes launched in parallel. Works with `paddle.distributed` and TPUs. """ if local_rank != -1: import paddle return paddle.distributed.get_world_size() return 1
[docs]def speed_metrics(split, start_time, num_samples=None, num_steps=None, seq_length=None): """ Measure and return speed performance metrics. This function requires a time snapshot `start_time` before the operation to be measured starts and this function should be run immediately after the operation to be measured has completed. Args: - split: name to prefix metric (like train, eval, test...) - start_time: operation start time - num_samples: number of samples processed """ runtime = time.time() - start_time result = {f"{split}_runtime": round(runtime, 4)} if num_samples is not None: samples_per_second = num_samples / runtime result[f"{split}_samples_per_second"] = round(samples_per_second, 4) if seq_length is not None: tokens_per_second_per_device = samples_per_second * seq_length / paddle.distributed.get_world_size() result[f"{split}_tokens_per_second_per_device"] = round(tokens_per_second_per_device, 4) if num_steps is not None: steps_per_second = num_steps / runtime result[f"{split}_steps_per_second"] = round(steps_per_second, 4) return result
[docs]class SchedulerType(ExplicitEnum): LINEAR = "linear" COSINE = "cosine" CONSTANT = "constant" CONSTANT_WITH_WARMUP = "constant_with_warmup" POLYNOMIAL = "polynomial"
def get_constant_schedule(learning_rate: float, last_epoch: int = -1): """ Create a schedule with a constant learning rate, using the learning rate set in optimizer. Args: learning_rate (float) The initial learning rate. It is a python float number. last_epoch (`int`, *optional*, defaults to -1): The index of the last epoch when resuming training. Return: `paddle.optimizer.lr.LambdaDecay` with the appropriate schedule. """ return LambdaDecay(learning_rate, lambda _: 1, last_epoch=last_epoch) def get_constant_schedule_with_warmup(learning_rate: float, num_warmup_steps: int, last_epoch: int = -1): """ Create a schedule with a constant learning rate preceded by a warmup period during which the learning rate increases linearly between 0 and the initial lr set in the optimizer. Args: learning_rate (float) The initial learning rate. It is a python float number. num_warmup_steps (`int`): The number of steps for the warmup phase. last_epoch (`int`, *optional*, defaults to -1): The index of the last epoch when resuming training. Return: `paddle.optimizer.lr.LambdaDecay` with the appropriate schedule. """ def lr_lambda(current_step: int): if current_step < num_warmup_steps: return float(current_step) / float(max(1.0, num_warmup_steps)) return 1.0 return LambdaDecay(learning_rate, lr_lambda, last_epoch=last_epoch) def get_linear_schedule_with_warmup(learning_rate: float, num_warmup_steps, num_training_steps, last_epoch=-1): """ Create a schedule with a learning rate that decreases linearly from the initial lr set in the optimizer to 0, after a warmup period during which it increases linearly from 0 to the initial lr set in the optimizer. Args: learning_rate (float) The initial learning rate. It is a python float number. num_warmup_steps (`int`): The number of steps for the warmup phase. num_training_steps (`int`): The total number of training steps. last_epoch (`int`, *optional*, defaults to -1): The index of the last epoch when resuming training. Return: `paddle.optimizer.lr.LambdaDecay` with the appropriate schedule. """ def lr_lambda(current_step: int): if current_step < num_warmup_steps: return float(current_step) / float(max(1, num_warmup_steps)) return max( 0.0, float(num_training_steps - current_step) / float(max(1, num_training_steps - num_warmup_steps)) ) return LambdaDecay(learning_rate, lr_lambda, last_epoch) def get_cosine_schedule_with_warmup( learning_rate: float, num_warmup_steps: int, num_training_steps: int, num_cycles: float = 0.5, last_epoch: int = -1 ): """ Create a schedule with a learning rate that decreases following the values of the cosine function between the initial lr set in the optimizer to 0, after a warmup period during which it increases linearly between 0 and the initial lr set in the optimizer. Args: learning_rate (float) The initial learning rate. It is a python float number. num_warmup_steps (`int`): The number of steps for the warmup phase. num_training_steps (`int`): The total number of training steps. num_cycles (`float`, *optional*, defaults to 0.5): The number of waves in the cosine schedule (the defaults is to just decrease from the max value to 0 following a half-cosine). last_epoch (`int`, *optional*, defaults to -1): The index of the last epoch when resuming training. Return: `paddle.optimizer.lr.LambdaDecay` with the appropriate schedule. """ def lr_lambda(current_step): if current_step < num_warmup_steps: return float(current_step) / float(max(1, num_warmup_steps)) progress = float(current_step - num_warmup_steps) / float(max(1, num_training_steps - num_warmup_steps)) return max(0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress))) return LambdaDecay(learning_rate, lr_lambda, last_epoch) def get_polynomial_decay_schedule_with_warmup( learning_rate: float, num_warmup_steps: int, num_training_steps: int, lr_end: float = 1e-7, power: float = 1.0, last_epoch: int = -1, ): """ Create a schedule with a learning rate that decreases as a polynomial decay from the initial lr set in the optimizer to end lr defined by *lr_end*, after a warmup period during which it increases linearly from 0 to the initial lr set in the optimizer. Args: learning_rate (`float`): The base learning rate. It is a python float number. num_warmup_steps (`int`): The number of steps for the warmup phase. num_training_steps (`int`): The total number of training steps. lr_end (`float`, *optional*, defaults to 1e-7): The end LR. power (`float`, *optional*, defaults to 1.0): Power factor. last_epoch (`int`, *optional*, defaults to -1): The index of the last epoch when resuming training. Note: *power* defaults to 1.0 as in the fairseq implementation, which in turn is based on the original BERT implementation at https://github.com/google-research/bert/blob/f39e881b169b9d53bea03d2d341b31707a6c052b/optimization.py#L37 Return: `paddle.optimizer.lr.LambdaDecay` with the appropriate schedule. """ lr_init = learning_rate if not (lr_init > lr_end): raise ValueError(f"lr_end ({lr_end}) must be be smaller than initial lr ({lr_init})") def lr_lambda(current_step: int): if current_step < num_warmup_steps: return float(current_step) / float(max(1, num_warmup_steps)) elif current_step > num_training_steps: return lr_end / lr_init # as LambdaLR multiplies by lr_init else: lr_range = lr_init - lr_end decay_steps = num_training_steps - num_warmup_steps pct_remaining = 1 - (current_step - num_warmup_steps) / decay_steps decay = lr_range * pct_remaining**power + lr_end return decay / lr_init # as LambdaLR multiplies by lr_init return LambdaDecay(learning_rate, lr_lambda, last_epoch) TYPE_TO_SCHEDULER_FUNCTION = { SchedulerType.LINEAR: get_linear_schedule_with_warmup, SchedulerType.COSINE: get_cosine_schedule_with_warmup, SchedulerType.CONSTANT: get_constant_schedule, SchedulerType.POLYNOMIAL: get_polynomial_decay_schedule_with_warmup, SchedulerType.CONSTANT_WITH_WARMUP: get_constant_schedule_with_warmup, }
[docs]def get_scheduler( name: Union[str, SchedulerType], learning_rate: float, num_warmup_steps: Optional[int] = None, num_training_steps: Optional[int] = None, num_cycles: Optional[float] = 0.5, lr_end: Optional[float] = 1e-7, power: Optional[float] = 1.0, ): """ Unified API to get any scheduler from its name. Args: name (`str` or `SchedulerType`): The name of the scheduler to use. learning_rate (float) The initial learning rate. It is a python float number. num_warmup_steps (`int`, *optional*): The number of warmup steps to do. This is not required by all schedulers (hence the argument being optional), the function will raise an error if it's unset and the scheduler type requires it. num_training_steps (`int``, *optional*): The number of training steps to do. This is not required by all schedulers (hence the argument being optional), the function will raise an error if it's unset and the scheduler type requires it. num_cycles (``float``, *optional*): The number of waves in the cosine scheduler (the defaults is to just decrease from the max value to 0 following a half-cosine). This is not required by all schedulers (hence the argument being optional) lr_end (``float``, *optional*): The end LR in the polynomial scheduler. This is not required by all schedulers (hence the argument being optional). power (``float``, *optional*): The power factor in the polynomial scheduler. This is not required by all schedulers (hence the argument being optional). """ name = SchedulerType(name) schedule_func = TYPE_TO_SCHEDULER_FUNCTION[name] if name == SchedulerType.CONSTANT: return schedule_func(learning_rate) # All other schedulers require `num_warmup_steps` if num_warmup_steps is None: raise ValueError(f"{name} requires `num_warmup_steps`, please provide that argument.") if name == SchedulerType.CONSTANT_WITH_WARMUP: return schedule_func(learning_rate, num_warmup_steps=num_warmup_steps) # All other schedulers require `num_training_steps` if num_training_steps is None: raise ValueError(f"{name} requires `num_training_steps`, please provide that argument.") if name == SchedulerType.COSINE: return schedule_func( learning_rate, num_warmup_steps=num_warmup_steps, num_training_steps=num_training_steps, num_cycles=num_cycles, ) if name == SchedulerType.POLYNOMIAL: return schedule_func( learning_rate, num_warmup_steps=num_warmup_steps, num_training_steps=num_training_steps, lr_end=lr_end, power=power, ) return schedule_func(learning_rate, num_warmup_steps=num_warmup_steps, num_training_steps=num_training_steps)
def _secs2timedelta(secs): """ convert seconds to hh:mm:ss.msec, msecs rounded to 2 decimals """ msec = int(abs(secs - int(secs)) * 100) return f"{datetime.timedelta(seconds=int(secs))}.{msec:02d}" def metrics_format(self, metrics: Dict[str, float]) -> Dict[str, float]: """ Reformat Trainer metrics values to a human-readable format Args: metrics (`Dict[str, float]`): The metrics returned from train/evaluate/predict Returns: metrics (`Dict[str, float]`): The reformatted metrics """ metrics_copy = metrics.copy() for k, v in metrics_copy.items(): if "_mem_" in k: metrics_copy[k] = f"{ v >> 20 }MB" elif "_runtime" in k: metrics_copy[k] = _secs2timedelta(v) elif k == "total_flos": metrics_copy[k] = f"{ int(v) >> 30 }GF" elif isinstance(metrics_copy[k], float): metrics_copy[k] = round(v, 4) return metrics_copy def log_metrics(self, split, metrics): """ Log metrics in a specially formatted way Under distributed environment this is done only for a process with rank 0. Args: split (`str`): Mode/split name: one of `train`, `eval`, `test` metrics (`Dict[str, float]`): The metrics returned from train/evaluate/predictmetrics: metrics dict """ if not self.is_world_process_zero(): return logger.info(f"***** {split} metrics *****") metrics_formatted = self.metrics_format(metrics) k_width = max(len(str(x)) for x in metrics_formatted.keys()) v_width = max(len(str(x)) for x in metrics_formatted.values()) for key in sorted(metrics_formatted.keys()): logger.info(f" {key: <{k_width}} = {metrics_formatted[key]:>{v_width}}") def save_metrics(self, split, metrics, combined=True): """ Save metrics into a json file for that split, e.g. `train_results.json`. Under distributed environment this is done only for a process with rank 0. Args: split (`str`): Mode/split name: one of `train`, `eval`, `test`, `all` metrics (`Dict[str, float]`): The metrics returned from train/evaluate/predict combined (`bool`, *optional*, defaults to `True`): Creates combined metrics by updating `all_results.json` with metrics of this call To understand the metrics please read the docstring of [`~Trainer.log_metrics`]. The only difference is that raw unformatted numbers are saved in the current method. """ if not self.is_world_process_zero(): return path = os.path.join(self.args.output_dir, f"{split}_results.json") with open(path, "w") as f: json.dump(metrics, f, indent=4, sort_keys=True) if combined: path = os.path.join(self.args.output_dir, "all_results.json") if os.path.exists(path): with open(path, "r") as f: all_metrics = json.load(f) else: all_metrics = {} all_metrics.update(metrics) with open(path, "w") as f: json.dump(all_metrics, f, indent=4, sort_keys=True) def save_state(self): """ Saves the Trainer state, since Trainer.save_model saves only the tokenizer with the model Under distributed environment this is done only for a process with rank 0. """ if not self.is_world_process_zero(): return path = os.path.join(self.args.output_dir, "trainer_state.json") self.state.save_to_json(path) def has_length(dataset): """ Checks if the dataset implements __len__() and it doesn't raise an error """ try: return len(dataset) is not None except (TypeError, ValueError, RuntimeError): # TypeError: len() of unsized object return False class TrainerMemoryTracker: """ A helper class that tracks cpu and gpu memory. This class will silently skip unless `psutil` is available. Install with `pip install psutil`. When a stage completes, it can pass metrics dict to update with the memory metrics gathered during this stage. Example : ```python self._memory_tracker = TrainerMemoryTracker(self.args.skip_memory_metrics) self._memory_tracker.start() # code ... metrics = {"train_runtime": 10.5} self._memory_tracker.stop_and_update_metrics(metrics) ``` At the moment GPU tracking is only for `paddle`. # To understand this class' intricacies please read the documentation of [`~Trainer.log_metrics`]. """ # map trainer methods to metrics prefix stages = { "__init__": "init", "train": "train", "_inner_training_loop": "train", "evaluate": "eval", "predict": "test", } def __init__(self, skip_memory_metrics=False): self.skip_memory_metrics = skip_memory_metrics if not is_psutil_available(): # soft dependency on psutil self.skip_memory_metrics = True if self.skip_memory_metrics: return import psutil # noqa if is_paddle_cuda_available(): import paddle self.paddle = paddle self.gpu = {} else: self.paddle = None self.process = psutil.Process() self.cur_stage = None self.cpu = {} self.init_reported = False def derive_stage(self): """derives the stage/caller name automatically""" caller = inspect.currentframe().f_back.f_back.f_code.co_name if caller in self.stages: return self.stages[caller] else: raise ValueError( f"was called from {caller}, but only expect to be called from one of {self.stages.keys()}" ) def cpu_mem_used(self): """get resident set size memory for the current process""" return self.process.memory_info().rss def peak_monitor_func(self): self.cpu_mem_used_peak = -1 while True: self.cpu_mem_used_peak = max(self.cpu_mem_used(), self.cpu_mem_used_peak) # can't sleep or will not catch the peak right (this comment is here on purpose) # time.sleep(0.001) # 1msec if not self.peak_monitoring: break def start(self): """start tracking for the caller's stage""" if self.skip_memory_metrics: return stage = self.derive_stage() # deal with nested calls of eval during train - simply ignore those if self.cur_stage is not None and self.cur_stage != stage: return self.cur_stage = stage gc.collect() if self.paddle is not None: # self.paddle.cuda.reset_peak_memory_stats()? self.paddle.device.cuda.empty_cache() # gpu if self.paddle is not None: self.gpu_mem_used_at_start = self.paddle.device.cuda.memory_allocated() # cpu self.cpu_mem_used_at_start = self.cpu_mem_used() self.peak_monitoring = True peak_monitor_thread = threading.Thread(target=self.peak_monitor_func) peak_monitor_thread.daemon = True peak_monitor_thread.start() def stop(self, stage): """stop tracking for the passed stage""" # deal with nested calls of eval during train - simply ignore those if self.cur_stage is not None and self.cur_stage != stage: return # this sends a signal to peak_monitor_func to complete its loop self.peak_monitoring = False # first ensure all objects get collected and their memory is freed gc.collect() if self.paddle is not None: self.paddle.device.cuda.empty_cache() # concepts: # - alloc_delta: the difference of allocated memory between the end and the start # - peaked_delta: the difference between the peak memory and the current memory # in order to know how much memory the measured code consumed one needs to sum these two # gpu if self.paddle is not None: self.gpu_mem_used_now = self.paddle.device.cuda.memory_allocated() self.gpu_mem_used_peak = self.paddle.device.cuda.max_memory_allocated() self.gpu[self.cur_stage] = dict( begin=self.gpu_mem_used_at_start, end=self.gpu_mem_used_now, alloc=(self.gpu_mem_used_now - self.gpu_mem_used_at_start), peaked=max(0, self.gpu_mem_used_peak - self.gpu_mem_used_now), ) # cpu self.cpu_mem_used_now = self.cpu_mem_used() self.cpu[self.cur_stage] = dict( begin=self.cpu_mem_used_at_start, end=self.cpu_mem_used_now, alloc=(self.cpu_mem_used_now - self.cpu_mem_used_at_start), peaked=max(0, self.cpu_mem_used_peak - self.cpu_mem_used_now), ) # reset - cycle finished self.cur_stage = None def update_metrics(self, stage, metrics): """updates the metrics""" if self.skip_memory_metrics: return # deal with nested calls of eval during train - simply ignore those if self.cur_stage is not None and self.cur_stage != stage: return if hasattr(self, "gpu_mem_used_peak"): metrics["gpu_mem_max_memory_allocated"] = self.gpu_mem_used_peak metrics["gpu_mem_max_memory_reserved"] = self.paddle.device.cuda.max_memory_reserved() # since we don't have a way to return init metrics, we push them into the first of train/val/predict stages = [stage] if not self.init_reported: stages.insert(0, "init") self.init_reported = True for stage in stages: for t in ["alloc", "peaked"]: if stage in self.cpu and t in self.cpu[stage]: metrics[f"{stage}_mem_cpu_{t}_delta"] = self.cpu[stage][t] if self.paddle is not None and stage in self.gpu and t in self.gpu[stage]: metrics[f"{stage}_mem_gpu_{t}_delta"] = self.gpu[stage][t] # if we need additional debug info, enable the following # for t in ["begin", "end"]: # if stage in self.cpu and t in self.cpu[stage]: # metrics[f"{stage}_mem_cpu_{t}"] = self.cpu[stage][t] # if self.paddle is not None and stage in self.gpu and t in self.gpu[stage]: # metrics[f"{stage}_mem_gpu_{t}"] = self.gpu[stage][t] # since memory can be allocated before init, and it might be difficult to track overall # memory usage, in particular for GPU, let's report memory usage at the point init was called if stages[0] == "init": metrics["before_init_mem_cpu"] = self.cpu["init"]["begin"] if self.paddle is not None: metrics["before_init_mem_gpu"] = self.gpu["init"]["begin"] # if we also wanted to report any additional memory allocations in between init and # whatever the next stage was we could also report this: # if self.cpu["init"]["end"] != self.cpu[stage]["begin"]: # metrics[f"after_init_mem_cpu_delta"] = self.cpu[stage]["begin"] - self.cpu["init"]["end"] # if self.paddle is not None and self.gpu["init"]["end"] != self.gpu[stage]["begin"]: # metrics[f"after_init_mem_gpu_delta"] = self.gpu[stage]["begin"] - self.gpu["init"]["end"] def stop_and_update_metrics(self, metrics=None): """combine stop and metrics update in one call for simpler code""" if self.skip_memory_metrics: return stage = self.derive_stage() self.stop(stage) # init doesn't have metrics to update so we just save that data for later stages to retrieve if metrics is not None: self.update_metrics(stage, metrics) class IterableDatasetShard(IterableDataset): """ Wraps a Paddle `IterableDataset` to generate samples for one of the processes only. Instances of this class will always yield a number of samples that is a round multiple of the actual batch size (which is `batch_size x num_processes`). Depending on the value of the `drop_last` attribute, it will either stop the iteration at the first batch that would be too small or loop with indices from the beginning. On two processes with an iterable dataset yielding of `[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]` with a batch size of 2: - the shard on process 0 will yield `[0, 1, 4, 5, 8, 9]` so will see batches `[0, 1]`, `[4, 5]`, `[8, 9]` - the shard on process 1 will yield `[2, 3, 6, 7, 10, 11]` so will see batches `[2, 3]`, `[6, 7]`, `[10, 11]` Args: dataset (`paddle.io.IterableDataset`): The batch sampler to split in several shards. batch_size (`int`, *optional*, defaults to 1): The size of the batches per shard. drop_last (`bool`, *optional*, defaults to `False`): Whether or not to drop the last incomplete batch or complete the last batches by using the samples from the beginning. num_processes (`int`, *optional*, defaults to 1): The number of processes running concurrently. process_index (`int`, *optional*, defaults to 0): The index of the current process. seed (`int`, *optional*, defaults to 0): A random seed that will be used for the random number generation in [`~trainer_utils.IterableDatasetShard.set_epoch`]. """ def __init__( self, dataset: IterableDataset, batch_size: int = 1, drop_last: bool = False, num_processes: int = 1, process_index: int = 0, seed: int = 0, ): self.dataset = dataset self.batch_size = batch_size self.drop_last = drop_last self.num_processes = num_processes self.process_index = process_index self.seed = seed self.epoch = 0 self.num_examples = 0 def set_epoch(self, epoch): self.epoch = epoch if hasattr(self.dataset, "set_epoch"): self.dataset.set_epoch(epoch) def __iter__(self): self.num_examples = 0 # TODO: support generator seed in sampling. # # if ( # not hasattr(self.dataset, "set_epoch") # and hasattr(self.dataset, "generator") # and isinstance(self.dataset.generator, paddle.fluid.Generator) # ): # self.dataset.generator.manual_seed(self.seed + self.epoch) real_batch_size = self.batch_size * self.num_processes process_slice = range(self.process_index * self.batch_size, (self.process_index + 1) * self.batch_size) first_batch = None current_batch = [] for element in self.dataset: self.num_examples += 1 current_batch.append(element) # Wait to have a full batch before yielding elements. if len(current_batch) == real_batch_size: for i in process_slice: yield current_batch[i] if first_batch is None: first_batch = current_batch.copy() current_batch = [] # Finished if drop_last is True, otherwise complete the last batch with elements from the beginning. if not self.drop_last and len(current_batch) > 0: if first_batch is None: first_batch = current_batch.copy() while len(current_batch) < real_batch_size: current_batch += first_batch for i in process_slice: yield current_batch[i] def __len__(self): # Will raise an error if the underlying dataset is not sized. if self.drop_last: return (len(self.dataset) // (self.batch_size * self.num_processes)) * self.batch_size else: return math.ceil(len(self.dataset) / (self.batch_size * self.num_processes)) * self.batch_size def find_batch_size(tensors): """ Find the first dimension of a tensor in a nested list/tuple/dict of tensors. """ if isinstance(tensors, (list, tuple)): for t in tensors: result = find_batch_size(t) if result is not None: return result elif isinstance(tensors, (dict, BatchEncoding)): for key, value in tensors.items(): result = find_batch_size(value) if result is not None: return result elif isinstance(tensors, paddle.Tensor): return tensors.shape[0] if len(tensors.shape) >= 1 else None elif isinstance(tensors, np.ndarray): return tensors.shape[0] if len(tensors.shape) >= 1 else None class RemoveColumnsCollator: """Wrap the data collator to remove unused columns before they are passed to the collator.""" def __init__( self, data_collator, signature_columns, logger=None, model_name: Optional[str] = None, description: Optional[str] = None, ): self.data_collator = data_collator self.signature_columns = signature_columns self.logger = logger self.description = description self.model_name = model_name self.message_logged = False def _remove_columns(self, feature: dict) -> dict: if not isinstance(feature, dict): return feature if not self.message_logged and self.logger and self.model_name: ignored_columns = list(set(feature.keys()) - set(self.signature_columns)) if len(ignored_columns) > 0: dset_description = "" if self.description is None else f"in the {self.description} set" self.logger.info( f"The following columns {dset_description} don't have a corresponding argument in " f"`{self.model_name}.forward` and have been ignored: {', '.join(ignored_columns)}." f" If {', '.join(ignored_columns)} are not expected by `{self.model_name}.forward`, " " you can safely ignore this message." ) self.message_logged = True return {k: v for k, v in feature.items() if k in self.signature_columns} def __call__(self, features: List[dict]): features = [self._remove_columns(feature) for feature in features] return self.data_collator(features) def set_hyrbid_parallel_seed(basic_seed, dataset_rank, tp_rank, pp_rank=0): from paddle.distributed.fleet.meta_parallel import get_rng_state_tracker random.seed(basic_seed + dataset_rank) np.random.seed(basic_seed + dataset_rank) paddle.seed(basic_seed + dataset_rank) # local_seed/ global_seed is used to control dropout in ModelParallel local_seed = basic_seed + 59999 + tp_rank * 10 + pp_rank * 1000 global_seed = basic_seed + 100003 + dataset_rank tracker = get_rng_state_tracker() if "global_seed" not in tracker.states_ and global_seed not in tracker.seeds_: tracker.add("global_seed", global_seed) if "local_seed" not in tracker.states_ and local_seed not in tracker.seeds_: tracker.add("local_seed", local_seed)