paddlenlp.trainer.trainer_utils 源代码

# Copyright 2020-present the HuggingFace Inc. team.
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This file is modified from
#  https://github.com/huggingface/transformers/blob/main/src/transformers/trainer_utils.py
"""
Utilities for the Trainer class. 
"""
import datetime
import json
import os
import random
import re
import time
import math
from enum import Enum
from typing import Dict, NamedTuple, Optional, Tuple, Union, List

import numpy as np
import paddle
from paddle.io import IterableDataset
from paddle.optimizer.lr import LambdaDecay

from ..utils.log import logger
from ..transformers.tokenizer_utils_base import BatchEncoding

__all__ = [
    "TrainOutput",
    "PredictionOutput",
    "EvalPrediction",
    "IntervalStrategy",
    "SchedulerType",
    "set_seed",
    "speed_metrics",
    "get_last_checkpoint",
    "get_scheduler",
]


def set_seed(seed: int):
    import paddle

    random.seed(seed)
    np.random.seed(seed)
    paddle.seed(seed)


class ExplicitEnum(Enum):
    """
    Enum with more explicit error message for missing values.
    """

    @classmethod
    def _missing_(cls, value):
        raise ValueError(
            f"{value} is not a valid {cls.__name__}, please select one of {list(cls._value2member_map_.keys())}"
        )


[文档]class EvalPrediction(NamedTuple): """ Evaluation output (always contains labels), to be used to compute metrics. Parameters: predictions (`np.ndarray`): Predictions of the model. label_ids (`np.ndarray`): Targets to be matched. """ predictions: Union[np.ndarray, Tuple[np.ndarray]] label_ids: Union[np.ndarray, Tuple[np.ndarray]]
class EvalLoopOutput(NamedTuple): predictions: Union[np.ndarray, Tuple[np.ndarray]] label_ids: Optional[Union[np.ndarray, Tuple[np.ndarray]]] metrics: Optional[Dict[str, float]] num_samples: Optional[int]
[文档]class PredictionOutput(NamedTuple): predictions: Union[np.ndarray, Tuple[np.ndarray]] label_ids: Optional[Union[np.ndarray, Tuple[np.ndarray]]] metrics: Optional[Dict[str, float]]
[文档]class TrainOutput(NamedTuple): global_step: int training_loss: float metrics: Dict[str, float]
PREFIX_CHECKPOINT_DIR = "checkpoint" _re_checkpoint = re.compile(r"^" + PREFIX_CHECKPOINT_DIR + r"\-(\d+)$") def get_last_checkpoint(folder): content = os.listdir(folder) checkpoints = [ path for path in content if _re_checkpoint.search(path) is not None and os.path.isdir(os.path.join(folder, path)) ] if len(checkpoints) == 0: return return os.path.join(folder, max(checkpoints, key=lambda x: int(_re_checkpoint.search(x).groups()[0])))
[文档]class IntervalStrategy(ExplicitEnum): NO = "no" STEPS = "steps" EPOCH = "epoch"
class EvaluationStrategy(ExplicitEnum): NO = "no" STEPS = "steps" EPOCH = "epoch" class OptimizerNames(ExplicitEnum): """ Stores the acceptable string identifiers for optimizers. """ ADAMW = "adamw" ADAFACTOR = "adafactor" class ShardingOption(ExplicitEnum): """ Sharding Option OP for sharding optimizer state GRAD for sharding gradients FULL_SHARD for sharding optimizer gradient and parameter OFFLOAD means offload to cpu. """ SHARD_OP = "stage1" SHARD_GRAD_OP = "stage2" FULL_SHARD = "stage3" # NO_SHARD = "no" OFFLOAD = "offload" def is_main_process(local_rank): """ Whether or not the current process is the local process, based on `xm.get_ordinal()` (for TPUs) first, then on `local_rank`. """ return local_rank in [-1, 0] def total_processes_number(local_rank): """ Return the number of processes launched in parallel. Works with `paddle.distributed` and TPUs. """ if local_rank != -1: import paddle return paddle.distributed.get_world_size() return 1
[文档]def speed_metrics(split, start_time, num_samples=None, num_steps=None): """ Measure and return speed performance metrics. This function requires a time snapshot `start_time` before the operation to be measured starts and this function should be run immediately after the operation to be measured has completed. Args: - split: name to prefix metric (like train, eval, test...) - start_time: operation start time - num_samples: number of samples processed """ runtime = time.time() - start_time result = {f"{split}_runtime": round(runtime, 4)} if num_samples is not None: samples_per_second = num_samples / runtime result[f"{split}_samples_per_second"] = round(samples_per_second, 3) if num_steps is not None: steps_per_second = num_steps / runtime result[f"{split}_steps_per_second"] = round(steps_per_second, 3) return result
[文档]class SchedulerType(ExplicitEnum): LINEAR = "linear" COSINE = "cosine" CONSTANT = "constant" CONSTANT_WITH_WARMUP = "constant_with_warmup"
def get_constant_schedule(learning_rate: float, last_epoch: int = -1): """ Create a schedule with a constant learning rate, using the learning rate set in optimizer. Args: learning_rate (float) The initial learning rate. It is a python float number. last_epoch (`int`, *optional*, defaults to -1): The index of the last epoch when resuming training. Return: `paddle.optimizer.lr.LambdaDecay` with the appropriate schedule. """ return LambdaDecay(learning_rate, lambda _: 1, last_epoch=last_epoch) def get_constant_schedule_with_warmup(learning_rate: float, num_warmup_steps: int, last_epoch: int = -1): """ Create a schedule with a constant learning rate preceded by a warmup period during which the learning rate increases linearly between 0 and the initial lr set in the optimizer. Args: learning_rate (float) The initial learning rate. It is a python float number. num_warmup_steps (`int`): The number of steps for the warmup phase. last_epoch (`int`, *optional*, defaults to -1): The index of the last epoch when resuming training. Return: `paddle.optimizer.lr.LambdaDecay` with the appropriate schedule. """ def lr_lambda(current_step: int): if current_step < num_warmup_steps: return float(current_step) / float(max(1.0, num_warmup_steps)) return 1.0 return LambdaDecay(learning_rate, lr_lambda, last_epoch=last_epoch) def get_linear_schedule_with_warmup(learning_rate: float, num_warmup_steps, num_training_steps, last_epoch=-1): """ Create a schedule with a learning rate that decreases linearly from the initial lr set in the optimizer to 0, after a warmup period during which it increases linearly from 0 to the initial lr set in the optimizer. Args: learning_rate (float) The initial learning rate. It is a python float number. num_warmup_steps (`int`): The number of steps for the warmup phase. num_training_steps (`int`): The total number of training steps. last_epoch (`int`, *optional*, defaults to -1): The index of the last epoch when resuming training. Return: `paddle.optimizer.lr.LambdaDecay` with the appropriate schedule. """ def lr_lambda(current_step: int): if current_step < num_warmup_steps: return float(current_step) / float(max(1, num_warmup_steps)) return max( 0.0, float(num_training_steps - current_step) / float(max(1, num_training_steps - num_warmup_steps)) ) return LambdaDecay(learning_rate, lr_lambda, last_epoch) def get_cosine_schedule_with_warmup( learning_rate: float, num_warmup_steps: int, num_training_steps: int, num_cycles: float = 0.5, last_epoch: int = -1 ): """ Create a schedule with a learning rate that decreases following the values of the cosine function between the initial lr set in the optimizer to 0, after a warmup period during which it increases linearly between 0 and the initial lr set in the optimizer. Args: learning_rate (float) The initial learning rate. It is a python float number. num_warmup_steps (`int`): The number of steps for the warmup phase. num_training_steps (`int`): The total number of training steps. num_cycles (`float`, *optional*, defaults to 0.5): The number of waves in the cosine schedule (the defaults is to just decrease from the max value to 0 following a half-cosine). last_epoch (`int`, *optional*, defaults to -1): The index of the last epoch when resuming training. Return: `paddle.optimizer.lr.LambdaDecay` with the appropriate schedule. """ def lr_lambda(current_step): if current_step < num_warmup_steps: return float(current_step) / float(max(1, num_warmup_steps)) progress = float(current_step - num_warmup_steps) / float(max(1, num_training_steps - num_warmup_steps)) return max(0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress))) return LambdaDecay(learning_rate, lr_lambda, last_epoch) TYPE_TO_SCHEDULER_FUNCTION = { SchedulerType.LINEAR: get_linear_schedule_with_warmup, SchedulerType.COSINE: get_cosine_schedule_with_warmup, SchedulerType.CONSTANT: get_constant_schedule, SchedulerType.CONSTANT_WITH_WARMUP: get_constant_schedule_with_warmup, }
[文档]def get_scheduler( name: Union[str, SchedulerType], learning_rate: float, num_warmup_steps: Optional[int] = None, num_training_steps: Optional[int] = None, ): """ Unified API to get any scheduler from its name. Args: name (`str` or `SchedulerType`): The name of the scheduler to use. learning_rate (float) The initial learning rate. It is a python float number. num_warmup_steps (`int`, *optional*): The number of warmup steps to do. This is not required by all schedulers (hence the argument being optional), the function will raise an error if it's unset and the scheduler type requires it. num_training_steps (`int``, *optional*): The number of training steps to do. This is not required by all schedulers (hence the argument being optional), the function will raise an error if it's unset and the scheduler type requires it. """ name = SchedulerType(name) schedule_func = TYPE_TO_SCHEDULER_FUNCTION[name] if name == SchedulerType.CONSTANT: return schedule_func(learning_rate) # All other schedulers require `num_warmup_steps` if num_warmup_steps is None: raise ValueError(f"{name} requires `num_warmup_steps`, please provide that argument.") if name == SchedulerType.CONSTANT_WITH_WARMUP: return schedule_func(learning_rate, num_warmup_steps=num_warmup_steps) # All other schedulers require `num_training_steps` if num_training_steps is None: raise ValueError(f"{name} requires `num_training_steps`, please provide that argument.") return schedule_func(learning_rate, num_warmup_steps=num_warmup_steps, num_training_steps=num_training_steps)
def _secs2timedelta(secs): """ convert seconds to hh:mm:ss.msec, msecs rounded to 2 decimals """ msec = int(abs(secs - int(secs)) * 100) return f"{datetime.timedelta(seconds=int(secs))}.{msec:02d}" def metrics_format(self, metrics: Dict[str, float]) -> Dict[str, float]: """ Reformat Trainer metrics values to a human-readable format Args: metrics (`Dict[str, float]`): The metrics returned from train/evaluate/predict Returns: metrics (`Dict[str, float]`): The reformatted metrics """ metrics_copy = metrics.copy() for k, v in metrics_copy.items(): if "_mem_" in k: metrics_copy[k] = f"{ v >> 20 }MB" elif "_runtime" in k: metrics_copy[k] = _secs2timedelta(v) elif k == "total_flos": metrics_copy[k] = f"{ int(v) >> 30 }GF" elif isinstance(metrics_copy[k], float): metrics_copy[k] = round(v, 4) return metrics_copy def log_metrics(self, split, metrics): """ Log metrics in a specially formatted way Under distributed environment this is done only for a process with rank 0. Args: split (`str`): Mode/split name: one of `train`, `eval`, `test` metrics (`Dict[str, float]`): The metrics returned from train/evaluate/predictmetrics: metrics dict """ if not self.is_world_process_zero(): return logger.info(f"***** {split} metrics *****") metrics_formatted = self.metrics_format(metrics) k_width = max(len(str(x)) for x in metrics_formatted.keys()) v_width = max(len(str(x)) for x in metrics_formatted.values()) for key in sorted(metrics_formatted.keys()): logger.info(f" {key: <{k_width}} = {metrics_formatted[key]:>{v_width}}") def save_metrics(self, split, metrics, combined=True): """ Save metrics into a json file for that split, e.g. `train_results.json`. Under distributed environment this is done only for a process with rank 0. Args: split (`str`): Mode/split name: one of `train`, `eval`, `test`, `all` metrics (`Dict[str, float]`): The metrics returned from train/evaluate/predict combined (`bool`, *optional*, defaults to `True`): Creates combined metrics by updating `all_results.json` with metrics of this call To understand the metrics please read the docstring of [`~Trainer.log_metrics`]. The only difference is that raw unformatted numbers are saved in the current method. """ if not self.is_world_process_zero(): return path = os.path.join(self.args.output_dir, f"{split}_results.json") with open(path, "w") as f: json.dump(metrics, f, indent=4, sort_keys=True) if combined: path = os.path.join(self.args.output_dir, "all_results.json") if os.path.exists(path): with open(path, "r") as f: all_metrics = json.load(f) else: all_metrics = {} all_metrics.update(metrics) with open(path, "w") as f: json.dump(all_metrics, f, indent=4, sort_keys=True) def save_state(self): """ Saves the Trainer state, since Trainer.save_model saves only the tokenizer with the model Under distributed environment this is done only for a process with rank 0. """ if not self.is_world_process_zero(): return path = os.path.join(self.args.output_dir, "trainer_state.json") self.state.save_to_json(path) def has_length(dataset): """ Checks if the dataset implements __len__() and it doesn't raise an error """ try: return len(dataset) is not None except (TypeError, ValueError, RuntimeError): # TypeError: len() of unsized object return False def get_last_checkpoint(folder): content = os.listdir(folder) checkpoints = [ path for path in content if _re_checkpoint.search(path) is not None and os.path.isdir(os.path.join(folder, path)) ] if len(checkpoints) == 0: return return os.path.join(folder, max(checkpoints, key=lambda x: int(_re_checkpoint.search(x).groups()[0]))) class IterableDatasetShard(IterableDataset): """ Wraps a Paddle `IterableDataset` to generate samples for one of the processes only. Instances of this class will always yield a number of samples that is a round multiple of the actual batch size (which is `batch_size x num_processes`). Depending on the value of the `drop_last` attribute, it will either stop the iteration at the first batch that would be too small or loop with indices from the beginning. On two processes with an iterable dataset yielding of `[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]` with a batch size of 2: - the shard on process 0 will yield `[0, 1, 4, 5, 8, 9]` so will see batches `[0, 1]`, `[4, 5]`, `[8, 9]` - the shard on process 1 will yield `[2, 3, 6, 7, 10, 11]` so will see batches `[2, 3]`, `[6, 7]`, `[10, 11]` Args: dataset (`paddle.io.IterableDataset`): The batch sampler to split in several shards. batch_size (`int`, *optional*, defaults to 1): The size of the batches per shard. drop_last (`bool`, *optional*, defaults to `False`): Whether or not to drop the last incomplete batch or complete the last batches by using the samples from the beginning. num_processes (`int`, *optional*, defaults to 1): The number of processes running concurrently. process_index (`int`, *optional*, defaults to 0): The index of the current process. seed (`int`, *optional*, defaults to 0): A random seed that will be used for the random number generation in [`~trainer_utils.IterableDatasetShard.set_epoch`]. """ def __init__( self, dataset: IterableDataset, batch_size: int = 1, drop_last: bool = False, num_processes: int = 1, process_index: int = 0, seed: int = 0, ): self.dataset = dataset self.batch_size = batch_size self.drop_last = drop_last self.num_processes = num_processes self.process_index = process_index self.seed = seed self.epoch = 0 self.num_examples = 0 def set_epoch(self, epoch): self.epoch = epoch if hasattr(self.dataset, "set_epoch"): self.dataset.set_epoch(epoch) def __iter__(self): self.num_examples = 0 # TODO: support generator seed in sampling. # # if ( # not hasattr(self.dataset, "set_epoch") # and hasattr(self.dataset, "generator") # and isinstance(self.dataset.generator, paddle.fluid.Generator) # ): # self.dataset.generator.manual_seed(self.seed + self.epoch) real_batch_size = self.batch_size * self.num_processes process_slice = range(self.process_index * self.batch_size, (self.process_index + 1) * self.batch_size) first_batch = None current_batch = [] for element in self.dataset: self.num_examples += 1 current_batch.append(element) # Wait to have a full batch before yielding elements. if len(current_batch) == real_batch_size: for i in process_slice: yield current_batch[i] if first_batch is None: first_batch = current_batch.copy() current_batch = [] # Finished if drop_last is True, otherwise complete the last batch with elements from the beginning. if not self.drop_last and len(current_batch) > 0: if first_batch is None: first_batch = current_batch.copy() while len(current_batch) < real_batch_size: current_batch += first_batch for i in process_slice: yield current_batch[i] def __len__(self): # Will raise an error if the underlying dataset is not sized. if self.drop_last: return (len(self.dataset) // (self.batch_size * self.num_processes)) * self.batch_size else: return math.ceil(len(self.dataset) / (self.batch_size * self.num_processes)) * self.batch_size def find_batch_size(tensors): """ Find the first dimension of a tensor in a nested list/tuple/dict of tensors. """ if isinstance(tensors, (list, tuple)): for t in tensors: result = find_batch_size(t) if result is not None: return result elif isinstance(tensors, (dict, BatchEncoding)): for key, value in tensors.items(): result = find_batch_size(value) if result is not None: return result elif isinstance(tensors, paddle.Tensor): return tensors.shape[0] if len(tensors.shape) >= 1 else None elif isinstance(tensors, np.ndarray): return tensors.shape[0] if len(tensors.shape) >= 1 else None class RemoveColumnsCollator: """Wrap the data collator to remove unused columns before they are passed to the collator.""" def __init__( self, data_collator, signature_columns, logger=None, model_name: Optional[str] = None, description: Optional[str] = None, ): self.data_collator = data_collator self.signature_columns = signature_columns self.logger = logger self.description = description self.model_name = model_name self.message_logged = False def _remove_columns(self, feature: dict) -> dict: if not isinstance(feature, dict): return feature if not self.message_logged and self.logger and self.model_name: ignored_columns = list(set(feature.keys()) - set(self.signature_columns)) if len(ignored_columns) > 0: dset_description = "" if self.description is None else f"in the {self.description} set" self.logger.info( f"The following columns {dset_description} don't have a corresponding argument in " f"`{self.model_name}.forward` and have been ignored: {', '.join(ignored_columns)}." f" If {', '.join(ignored_columns)} are not expected by `{self.model_name}.forward`, " " you can safely ignore this message." ) self.message_logged = True return {k: v for k, v in feature.items() if k in self.signature_columns} def __call__(self, features: List[dict]): features = [self._remove_columns(feature) for feature in features] return self.data_collator(features)