paddlenlp.trainer.trainer_utils 源代码

# Copyright 2020-present the HuggingFace Inc. team.
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This file is modified from
#  https://github.com/huggingface/transformers/blob/main/src/transformers/trainer_utils.py
"""
Utilities for the Trainer class. 
"""
import datetime
import json
import os
import random
import re
import time
from enum import Enum
from typing import Dict, NamedTuple, Optional, Tuple, Union

import numpy as np
from paddle.optimizer.lr import LambdaDecay
from ..utils.log import logger

__all__ = [
    "TrainOutput",
    "PredictionOutput",
    "EvalPrediction",
    "IntervalStrategy",
    "SchedulerType",
    "set_seed",
    "speed_metrics",
    "get_last_checkpoint",
    "get_scheduler",
]


def set_seed(seed: int):
    import paddle
    random.seed(seed)
    np.random.seed(seed)
    paddle.seed(seed)


class ExplicitEnum(Enum):
    """
    Enum with more explicit error message for missing values.
    """

    @classmethod
    def _missing_(cls, value):
        raise ValueError(
            f"{value} is not a valid {cls.__name__}, please select one of {list(cls._value2member_map_.keys())}"
        )


[文档]class EvalPrediction(NamedTuple): """ Evaluation output (always contains labels), to be used to compute metrics. Parameters: predictions (`np.ndarray`): Predictions of the model. label_ids (`np.ndarray`): Targets to be matched. """ predictions: Union[np.ndarray, Tuple[np.ndarray]] label_ids: Union[np.ndarray, Tuple[np.ndarray]]
class EvalLoopOutput(NamedTuple): predictions: Union[np.ndarray, Tuple[np.ndarray]] label_ids: Optional[Union[np.ndarray, Tuple[np.ndarray]]] metrics: Optional[Dict[str, float]] num_samples: Optional[int]
[文档]class PredictionOutput(NamedTuple): predictions: Union[np.ndarray, Tuple[np.ndarray]] label_ids: Optional[Union[np.ndarray, Tuple[np.ndarray]]] metrics: Optional[Dict[str, float]]
[文档]class TrainOutput(NamedTuple): global_step: int training_loss: float metrics: Dict[str, float]
PREFIX_CHECKPOINT_DIR = "checkpoint" _re_checkpoint = re.compile(r"^" + PREFIX_CHECKPOINT_DIR + r"\-(\d+)$") def get_last_checkpoint(folder): content = os.listdir(folder) checkpoints = [ path for path in content if _re_checkpoint.search(path) is not None and os.path.isdir(os.path.join(folder, path)) ] if len(checkpoints) == 0: return return os.path.join( folder, max(checkpoints, key=lambda x: int(_re_checkpoint.search(x).groups()[0])))
[文档]class IntervalStrategy(ExplicitEnum): NO = "no" STEPS = "steps" EPOCH = "epoch"
class EvaluationStrategy(ExplicitEnum): NO = "no" STEPS = "steps" EPOCH = "epoch" class OptimizerNames(ExplicitEnum): """ Stores the acceptable string identifiers for optimizers. """ ADAMW = "adamw" ADAFACTOR = "adafactor" def is_main_process(local_rank): """ Whether or not the current process is the local process, based on `xm.get_ordinal()` (for TPUs) first, then on `local_rank`. """ return local_rank in [-1, 0] def total_processes_number(local_rank): """ Return the number of processes launched in parallel. Works with `paddle.distributed` and TPUs. """ if local_rank != -1: import paddle return paddle.distributed.get_world_size() return 1
[文档]def speed_metrics(split, start_time, num_samples=None, num_steps=None): """ Measure and return speed performance metrics. This function requires a time snapshot `start_time` before the operation to be measured starts and this function should be run immediately after the operation to be measured has completed. Args: - split: name to prefix metric (like train, eval, test...) - start_time: operation start time - num_samples: number of samples processed """ runtime = time.time() - start_time result = {f"{split}_runtime": round(runtime, 4)} if num_samples is not None: samples_per_second = num_samples / runtime result[f"{split}_samples_per_second"] = round(samples_per_second, 3) if num_steps is not None: steps_per_second = num_steps / runtime result[f"{split}_steps_per_second"] = round(steps_per_second, 3) return result
[文档]class SchedulerType(ExplicitEnum): LINEAR = "linear" COSINE = "cosine" CONSTANT = "constant" CONSTANT_WITH_WARMUP = "constant_with_warmup"
def get_constant_schedule(learning_rate: float, last_epoch: int = -1): """ Create a schedule with a constant learning rate, using the learning rate set in optimizer. Args: learning_rate (float) The initial learning rate. It is a python float number. last_epoch (`int`, *optional*, defaults to -1): The index of the last epoch when resuming training. Return: `paddle.optimizer.lr.LambdaDecay` with the appropriate schedule. """ return LambdaDecay(learning_rate, lambda _: 1, last_epoch=last_epoch) def get_constant_schedule_with_warmup(learning_rate: float, num_warmup_steps: int, last_epoch: int = -1): """ Create a schedule with a constant learning rate preceded by a warmup period during which the learning rate increases linearly between 0 and the initial lr set in the optimizer. Args: learning_rate (float) The initial learning rate. It is a python float number. num_warmup_steps (`int`): The number of steps for the warmup phase. last_epoch (`int`, *optional*, defaults to -1): The index of the last epoch when resuming training. Return: `paddle.optimizer.lr.LambdaDecay` with the appropriate schedule. """ def lr_lambda(current_step: int): if current_step < num_warmup_steps: return float(current_step) / float(max(1.0, num_warmup_steps)) return 1.0 return LambdaDecay(learning_rate, lr_lambda, last_epoch=last_epoch) def get_linear_schedule_with_warmup(learning_rate: float, num_warmup_steps, num_training_steps, last_epoch=-1): """ Create a schedule with a learning rate that decreases linearly from the initial lr set in the optimizer to 0, after a warmup period during which it increases linearly from 0 to the initial lr set in the optimizer. Args: learning_rate (float) The initial learning rate. It is a python float number. num_warmup_steps (`int`): The number of steps for the warmup phase. num_training_steps (`int`): The total number of training steps. last_epoch (`int`, *optional*, defaults to -1): The index of the last epoch when resuming training. Return: `paddle.optimizer.lr.LambdaDecay` with the appropriate schedule. """ def lr_lambda(current_step: int): if current_step < num_warmup_steps: return float(current_step) / float(max(1, num_warmup_steps)) return max( 0.0, float(num_training_steps - current_step) / float(max(1, num_training_steps - num_warmup_steps))) return LambdaDecay(learning_rate, lr_lambda, last_epoch) def get_cosine_schedule_with_warmup(learning_rate: float, num_warmup_steps: int, num_training_steps: int, num_cycles: float = 0.5, last_epoch: int = -1): """ Create a schedule with a learning rate that decreases following the values of the cosine function between the initial lr set in the optimizer to 0, after a warmup period during which it increases linearly between 0 and the initial lr set in the optimizer. Args: learning_rate (float) The initial learning rate. It is a python float number. num_warmup_steps (`int`): The number of steps for the warmup phase. num_training_steps (`int`): The total number of training steps. num_cycles (`float`, *optional*, defaults to 0.5): The number of waves in the cosine schedule (the defaults is to just decrease from the max value to 0 following a half-cosine). last_epoch (`int`, *optional*, defaults to -1): The index of the last epoch when resuming training. Return: `paddle.optimizer.lr.LambdaDecay` with the appropriate schedule. """ def lr_lambda(current_step): if current_step < num_warmup_steps: return float(current_step) / float(max(1, num_warmup_steps)) progress = float(current_step - num_warmup_steps) / float( max(1, num_training_steps - num_warmup_steps)) return max( 0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress))) return LambdaDecay(learning_rate, lr_lambda, last_epoch) TYPE_TO_SCHEDULER_FUNCTION = { SchedulerType.LINEAR: get_linear_schedule_with_warmup, SchedulerType.COSINE: get_cosine_schedule_with_warmup, SchedulerType.CONSTANT: get_constant_schedule, SchedulerType.CONSTANT_WITH_WARMUP: get_constant_schedule_with_warmup, }
[文档]def get_scheduler( name: Union[str, SchedulerType], learning_rate: float, num_warmup_steps: Optional[int] = None, num_training_steps: Optional[int] = None, ): """ Unified API to get any scheduler from its name. Args: name (`str` or `SchedulerType`): The name of the scheduler to use. learning_rate (float) The initial learning rate. It is a python float number. num_warmup_steps (`int`, *optional*): The number of warmup steps to do. This is not required by all schedulers (hence the argument being optional), the function will raise an error if it's unset and the scheduler type requires it. num_training_steps (`int``, *optional*): The number of training steps to do. This is not required by all schedulers (hence the argument being optional), the function will raise an error if it's unset and the scheduler type requires it. """ name = SchedulerType(name) schedule_func = TYPE_TO_SCHEDULER_FUNCTION[name] if name == SchedulerType.CONSTANT: return schedule_func(learning_rate) # All other schedulers require `num_warmup_steps` if num_warmup_steps is None: raise ValueError( f"{name} requires `num_warmup_steps`, please provide that argument." ) if name == SchedulerType.CONSTANT_WITH_WARMUP: return schedule_func(learning_rate, num_warmup_steps=num_warmup_steps) # All other schedulers require `num_training_steps` if num_training_steps is None: raise ValueError( f"{name} requires `num_training_steps`, please provide that argument." ) return schedule_func(learning_rate, num_warmup_steps=num_warmup_steps, num_training_steps=num_training_steps)
def _secs2timedelta(secs): """ convert seconds to hh:mm:ss.msec, msecs rounded to 2 decimals """ msec = int(abs(secs - int(secs)) * 100) return f"{datetime.timedelta(seconds=int(secs))}.{msec:02d}" def metrics_format(self, metrics: Dict[str, float]) -> Dict[str, float]: """ Reformat Trainer metrics values to a human-readable format Args: metrics (`Dict[str, float]`): The metrics returned from train/evaluate/predict Returns: metrics (`Dict[str, float]`): The reformatted metrics """ metrics_copy = metrics.copy() for k, v in metrics_copy.items(): if "_mem_" in k: metrics_copy[k] = f"{ v >> 20 }MB" elif "_runtime" in k: metrics_copy[k] = _secs2timedelta(v) elif k == "total_flos": metrics_copy[k] = f"{ int(v) >> 30 }GF" elif isinstance(metrics_copy[k], float): metrics_copy[k] = round(v, 4) return metrics_copy def log_metrics(self, split, metrics): """ Log metrics in a specially formatted way Under distributed environment this is done only for a process with rank 0. Args: split (`str`): Mode/split name: one of `train`, `eval`, `test` metrics (`Dict[str, float]`): The metrics returned from train/evaluate/predictmetrics: metrics dict """ if not self.is_world_process_zero(): return logger.info(f"***** {split} metrics *****") metrics_formatted = self.metrics_format(metrics) k_width = max(len(str(x)) for x in metrics_formatted.keys()) v_width = max(len(str(x)) for x in metrics_formatted.values()) for key in sorted(metrics_formatted.keys()): logger.info( f" {key: <{k_width}} = {metrics_formatted[key]:>{v_width}}") def save_metrics(self, split, metrics, combined=True): """ Save metrics into a json file for that split, e.g. `train_results.json`. Under distributed environment this is done only for a process with rank 0. Args: split (`str`): Mode/split name: one of `train`, `eval`, `test`, `all` metrics (`Dict[str, float]`): The metrics returned from train/evaluate/predict combined (`bool`, *optional*, defaults to `True`): Creates combined metrics by updating `all_results.json` with metrics of this call To understand the metrics please read the docstring of [`~Trainer.log_metrics`]. The only difference is that raw unformatted numbers are saved in the current method. """ if not self.is_world_process_zero(): return path = os.path.join(self.args.output_dir, f"{split}_results.json") with open(path, "w") as f: json.dump(metrics, f, indent=4, sort_keys=True) if combined: path = os.path.join(self.args.output_dir, "all_results.json") if os.path.exists(path): with open(path, "r") as f: all_metrics = json.load(f) else: all_metrics = {} all_metrics.update(metrics) with open(path, "w") as f: json.dump(all_metrics, f, indent=4, sort_keys=True) def save_state(self): """ Saves the Trainer state, since Trainer.save_model saves only the tokenizer with the model Under distributed environment this is done only for a process with rank 0. """ if not self.is_world_process_zero(): return path = os.path.join(self.args.output_dir, "trainer_state.json") self.state.save_to_json(path) def has_length(dataset): """ Checks if the dataset implements __len__() and it doesn't raise an error """ try: return len(dataset) is not None except TypeError: # TypeError: len() of unsized object return False def get_last_checkpoint(folder): content = os.listdir(folder) checkpoints = [ path for path in content if _re_checkpoint.search(path) is not None and os.path.isdir(os.path.join(folder, path)) ] if len(checkpoints) == 0: return return os.path.join( folder, max(checkpoints, key=lambda x: int(_re_checkpoint.search(x).groups()[0])))