paddlenlp.transformers.ernie_gen.modeling 源代码

#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import io
import json
import os

import paddle
import six
from paddle import nn
from paddle.nn import functional as F

from paddlenlp.transformers import (
from import resolve_file_path
from paddlenlp.utils.log import logger

from .. import PretrainedModel, register_base_model
from ..utils import InitTrackerMeta, fn_args_to_dict

__all__ = ["ErnieGenPretrainedModel", "ErnieForGeneration", "ErnieGenModel"]

def _build_linear(n_in, n_out, name, init):
    return nn.Linear(
        weight_attr=paddle.ParamAttr(name="%s.w_0" % name if name is not None else None, initializer=init),
        bias_attr="%s.b_0" % name if name is not None else None,

def _build_ln(n_in, name):
    return nn.LayerNorm(
            name="%s_layer_norm_scale" % name if name is not None else None, initializer=nn.initializer.Constant(1.0)
            name="%s_layer_norm_bias" % name if name is not None else None, initializer=nn.initializer.Constant(1.0)

def append_name(name, postfix):
    if name is None:
        ret = None
    elif name == "":
        ret = postfix
        ret = "%s_%s" % (name, postfix)
    return ret

class AttentionLayer(nn.Layer):
    def __init__(self, cfg, name=None):
        super(AttentionLayer, self).__init__()
        initializer = nn.initializer.TruncatedNormal(std=cfg["initializer_range"])
        d_model = cfg["hidden_size"]
        n_head = cfg["num_attention_heads"]
        assert d_model % n_head == 0
        d_model_q = cfg.get("query_hidden_size_per_head", d_model // n_head) * n_head
        d_model_v = cfg.get("value_hidden_size_per_head", d_model // n_head) * n_head
        self.n_head = n_head
        self.d_key = d_model_q // n_head
        self.q = _build_linear(d_model, d_model_q, append_name(name, "query_fc"), initializer)
        self.k = _build_linear(d_model, d_model_q, append_name(name, "key_fc"), initializer)
        self.v = _build_linear(d_model, d_model_v, append_name(name, "value_fc"), initializer)
        self.o = _build_linear(d_model_v, d_model, append_name(name, "output_fc"), initializer)
        self.dropout = nn.Dropout(p=cfg["attention_probs_dropout_prob"])

    def forward(self, queries, keys, values, attn_bias, past_cache):
        assert len(queries.shape) == len(keys.shape) == len(values.shape) == 3
        # bsz, q_len, q_dim = queries.shape
        # bsz, k_len, k_dim = keys.shape
        # bsz, v_len, v_dim = values.shape
        # assert k_len == v_len

        q = self.q(queries)
        k = self.k(keys)
        v = self.v(values)

        cache = (k, v)
        if past_cache is not None:
            cached_k, cached_v = past_cache
            k = paddle.concat([cached_k, k], 1)
            v = paddle.concat([cached_v, v], 1)

        q = q.reshape([0, 0, self.n_head, q.shape[-1] // self.n_head]).transpose(
            [0, 2, 1, 3]
        )  # [batch, head, seq, dim]
        k = k.reshape([0, 0, self.n_head, k.shape[-1] // self.n_head]).transpose(
            [0, 2, 1, 3]
        )  # [batch, head, seq, dim]
        v = v.reshape([0, 0, self.n_head, v.shape[-1] // self.n_head]).transpose(
            [0, 2, 1, 3]
        )  # [batch, head, seq, dim]

        q = q.scale(self.d_key**-0.5)
        score = q.matmul(k, transpose_y=True)
        if attn_bias is not None:
            score += attn_bias
        score = F.softmax(score)
        score = self.dropout(score)

        out = score.matmul(v).transpose([0, 2, 1, 3])
        out = out.reshape([0, 0, out.shape[2] * out.shape[3]])
        out = self.o(out)
        return out, cache

class PositionwiseFeedForwardLayer(nn.Layer):
    def __init__(self, cfg, name=None):
        super(PositionwiseFeedForwardLayer, self).__init__()
        initializer = nn.initializer.TruncatedNormal(std=cfg["initializer_range"])
        d_model = cfg["hidden_size"]
        d_ffn = cfg.get("intermediate_size", 4 * d_model)
        self.act = getattr(paddle.nn.functional, cfg["hidden_act"])
        self.i = _build_linear(
            append_name(name, "fc_0"),
        self.o = _build_linear(d_ffn, d_model, append_name(name, "fc_1"), initializer)
        prob = cfg.get("intermediate_dropout_prob", 0.0)
        self.dropout = nn.Dropout(p=prob)

    def forward(self, inputs):
        hidden = self.act(self.i(inputs))
        hidden = self.dropout(hidden)
        out = self.o(hidden)
        return out

class ErnieEncoderLayer(nn.Layer):
    def __init__(self, cfg, name=None):
        super(ErnieEncoderLayer, self).__init__()
        d_model = cfg["hidden_size"]
        self.attn = AttentionLayer(cfg, name=append_name(name, "multi_head_att"))
        self.ln1 = _build_ln(d_model, name=append_name(name, "post_att"))
        self.ffn = PositionwiseFeedForwardLayer(cfg, name=append_name(name, "ffn"))
        self.ln2 = _build_ln(d_model, name=append_name(name, "post_ffn"))
        prob = cfg.get("intermediate_dropout_prob", cfg["hidden_dropout_prob"])
        self.dropout = nn.Dropout(p=prob)

    def forward(self, inputs, attn_bias=None, past_cache=None):
        attn_out, cache = self.attn(inputs, inputs, inputs, attn_bias, past_cache=past_cache)  # self attn
        attn_out = self.dropout(attn_out)
        hidden = attn_out + inputs
        hidden = self.ln1(hidden)  # dropout/ add/ norm

        ffn_out = self.ffn(hidden)
        ffn_out = self.dropout(ffn_out)
        hidden = ffn_out + hidden
        hidden = self.ln2(hidden)
        return hidden, cache

class ErnieEncoderStack(nn.Layer):
    def __init__(self, cfg, name=None):
        super(ErnieEncoderStack, self).__init__()
        n_layers = cfg["num_hidden_layers"]
        self.block = nn.LayerList([ErnieEncoderLayer(cfg, append_name(name, "layer_%d" % i)) for i in range(n_layers)])

    def forward(self, inputs, attn_bias=None, past_cache=None):
        if past_cache is not None:
            assert isinstance(past_cache, tuple), "unknown type of `past_cache`, expect tuple or list. got %s" % repr(
            past_cache = list(zip(*past_cache))
            past_cache = [None] * len(self.block)
        cache_list_k, cache_list_v, hidden_list = [], [], [inputs]

        for b, p in zip(self.block, past_cache):
            inputs, cache = b(inputs, attn_bias=attn_bias, past_cache=p)
            cache_k, cache_v = cache

        return inputs, hidden_list, (cache_list_k, cache_list_v)

[文档]@six.add_metaclass(InitTrackerMeta) class ErnieGenPretrainedModel(PretrainedModel): r""" An abstract class for pretrained ErnieGen models. It provides ErnieGen related `model_config_file`, `pretrained_init_configuration`, `resource_files_names`, `pretrained_resource_files_map`, `base_model_prefix` for downloading and loading pretrained models. See :class:`~paddlenlp.transformers.model_utils.PretrainedModel` for more details. """ ernie_gen_pretrained_init_configuration = { "ernie-gen-base-en": { "attention_probs_dropout_prob": 0.1, "hidden_act": "gelu", "hidden_dropout_prob": 0.1, "hidden_size": 768, "initializer_range": 0.02, "intermediate_size": 3072, "max_position_embeddings": 1024, "num_attention_heads": 12, "num_hidden_layers": 12, "type_vocab_size": 4, "vocab_size": 30522, "pad_token_id": 0, }, "ernie-gen-large-en": { "attention_probs_dropout_prob": 0.1, "hidden_act": "gelu", "hidden_dropout_prob": 0.1, "hidden_size": 1024, "initializer_range": 0.02, "intermediate_size": 4096, "max_position_embeddings": 1024, "num_attention_heads": 16, "num_hidden_layers": 24, "type_vocab_size": 4, "vocab_size": 30522, "pad_token_id": 0, }, "ernie-gen-large-en-430g": { "attention_probs_dropout_prob": 0.1, "hidden_act": "gelu", "hidden_dropout_prob": 0.1, "hidden_size": 1024, "initializer_range": 0.02, "intermediate_size": 4096, "max_position_embeddings": 1024, "num_attention_heads": 16, "num_hidden_layers": 24, "type_vocab_size": 4, "vocab_size": 30522, "pad_token_id": 0, }, } ernie_gen_pretrained_resource_files_map = { "model_state": { "ernie-gen-base-en": "", "ernie-gen-large-en": "", "ernie-gen-large-en-430g": "", } } # Support more model to warm start. pretrained_init_configuration = { **ernie_gen_pretrained_init_configuration, **BertPretrainedModel.pretrained_init_configuration, **ElectraPretrainedModel.pretrained_init_configuration, **RobertaPretrainedModel.pretrained_init_configuration, **ErniePretrainedModel.pretrained_init_configuration, } pretrained_resource_files_map = { "model_state": { **ernie_gen_pretrained_resource_files_map["model_state"], **BertPretrainedModel.pretrained_resource_files_map["model_state"], **ElectraPretrainedModel.pretrained_resource_files_map["model_state"], **RobertaPretrainedModel.pretrained_resource_files_map["model_state"], **ErniePretrainedModel.pretrained_resource_files_map["model_state"], } }
[文档] @classmethod def from_pretrained(cls, pretrained_model_name_or_path, *args, **kwargs): pretrained_model_name_or_path = str(pretrained_model_name_or_path) cache_dir = kwargs.pop("cache_dir", None) from_hf_hub = kwargs.pop("from_hf_hub", False) from_aistudio = kwargs.pop("from_aistudio", False) subfolder = kwargs.pop("subfolder", "") pretrained_models = list(cls.pretrained_init_configuration.keys()) resource_files = {} init_configuration = {} if pretrained_model_name_or_path in pretrained_models: for file_id, map_list in cls.pretrained_resource_files_map.items(): resource_files[file_id] = map_list[pretrained_model_name_or_path] init_configuration = copy.deepcopy(cls.pretrained_init_configuration[pretrained_model_name_or_path]) else: if os.path.isdir(pretrained_model_name_or_path): for file_id, file_name in cls.resource_files_names.items(): full_file_name = os.path.join(pretrained_model_name_or_path, file_name) if os.path.isfile(full_file_name): resource_files[file_id] = full_file_name resource_files["model_config_file"] = os.path.join( pretrained_model_name_or_path, cls.model_config_file ) else: raise ValueError( "Calling {}.from_pretrained() with a model identifier or the " "path to a directory instead. The supported model " "identifiers are as follows: {}".format(cls.__name__, cls.pretrained_init_configuration.keys()) ) # default_root = os.path.join(MODEL_HOME, pretrained_model_name_or_path) resolved_resource_files = {} for file_id, file_path in resource_files.items(): if file_path is None or os.path.isfile(file_path): resolved_resource_files[file_id] = file_path continue resolved_resource_files[file_id] = resolve_file_path( pretrained_model_name_or_path, [file_path], subfolder, cache_dir=cache_dir, from_aistudio=from_aistudio, from_hf_hub=from_hf_hub, ) # Prepare model initialization kwargs # Did we saved some inputs and kwargs to reload ? model_config_file = resolved_resource_files.pop("model_config_file", None) if model_config_file is not None: with, encoding="utf-8") as f: init_kwargs = json.load(f) else: init_kwargs = init_configuration # position args are stored in kwargs, maybe better not include init_args = init_kwargs.pop("init_args", [{}])[0] if len(init_args) == 0: init_args = init_kwargs name_prefix = kwargs.pop("name", None) init_kwargs.pop("name", None) init_args.pop("name", None) model = cls(init_args, name=name_prefix) weight_path = resolved_resource_files["model_state"]"loading pretrained model from %s" % weight_path) if os.path.exists(weight_path): m = paddle.load(weight_path) params_name = list(m.keys()) if "mlm.weight" not in params_name: # ernie_gen is not implemented with paddle.transformer. # So, when loading the params saved by paddle.transformer, we should convert the params name. # We will update ernie_gen with paddle.transformer in the future. name_index_begin = params_name[0].index(".") + 1 for old_name in params_name: new_name = ( old_name[name_index_begin:] .replace("embeddings.word_embeddings", "word_emb") .replace("embeddings.position_embeddings", "pos_emb") .replace("embeddings.token_type_embeddings", "sent_emb") .replace("embeddings.layer_norm", "ln") .replace("encoder.layers", "encoder_stack.block") .replace("self_attn", "attn") .replace("k_proj", "k") .replace("q_proj", "q") .replace("v_proj", "v") .replace("out_proj", "o") .replace("linear1", "ffn.i") .replace("linear2", "ffn.o") .replace("norm1", "ln1") .replace("norm2", "ln2") .replace("pooler.dense", "pooler") ) m[new_name] = m.pop(old_name) for k, v in model.state_dict().items(): if k not in m:"param:%s not set in pretrained model, skip" % k) m[k] = v # FIXME: no need to do this in the future model.set_state_dict(m) else: raise ValueError("weight file not found in pretrain dir: %s" % weight_path) return model
def _post_init(self, original_init, *args, **kwargs): """ It would be hooked after `__init__` to add a dict including arguments of `__init__` as a attribute named `config` of the prtrained model instance. """ init_dict = fn_args_to_dict(original_init, *args, **kwargs) self.config = init_dict
@register_base_model class ErnieModel(ErnieGenPretrainedModel): def __init__(self, cfg, name=None): """ Fundamental pretrained Ernie model """ logger.debug("init ErnieModel with config: %s" % repr(cfg)) nn.Layer.__init__(self) d_model = cfg["hidden_size"] d_emb = cfg.get("emb_size", cfg["hidden_size"]) d_vocab = cfg["vocab_size"] d_pos = cfg["max_position_embeddings"] d_sent = cfg.get("sent_type_vocab_size") or cfg["type_vocab_size"] self.n_head = cfg["num_attention_heads"] self.return_additional_info = cfg.get("return_additional_info", False) initializer = nn.initializer.TruncatedNormal(std=cfg["initializer_range"]) self.ln = _build_ln(d_model, name=append_name(name, "pre_encoder")) self.word_emb = nn.Embedding( d_vocab, d_emb, weight_attr=paddle.ParamAttr(name=append_name(name, "word_embedding"), initializer=initializer), ) self.pos_emb = nn.Embedding( d_pos, d_emb, weight_attr=paddle.ParamAttr(name=append_name(name, "pos_embedding"), initializer=initializer), ) self.sent_emb = nn.Embedding( d_sent, d_emb, weight_attr=paddle.ParamAttr(name=append_name(name, "sent_embedding"), initializer=initializer), ) prob = cfg["hidden_dropout_prob"] self.dropout = nn.Dropout(p=prob) self.encoder_stack = ErnieEncoderStack(cfg, append_name(name, "encoder")) def forward( self, src_ids, sent_ids=None, pos_ids=None, input_mask=None, attn_bias=None, past_cache=None, use_causal_mask=False, ): """ Args: src_ids (Tensor): Indices of input sequence tokens in the vocabulary. They are numerical representations of tokens that build the input sequence. It's data type should be `int64` and has a shape of [batch_size, sequence_length]. sent_ids (Tensor, optional): Segment token indices to indicate different portions of the inputs. Its data type should be `int64` and it has a shape of [batch_size, sequence_length]. Defaults to `None`, which means we don't add segment embeddings. pos_ids(Tensor, optional): Indices of positions of each input sequence tokens in the position embeddings. Selected in the range ``[0, max_position_embeddings - 1]``. Shape as `[batch_size, num_tokens]` and dtype as int64. Defaults to `None`. input_mask(Tensor, optional): Mask used in multi-head attention to avoid performing attention on to some unwanted positions, usually the paddings or the subsequent positions. Its data type can be int, float and bool. When the data type is bool, the `masked` tokens have `False` values and the others have `True` values. When the data type is int, the `masked` tokens have `0` values and the others have `1` values. When the data type is float, the `masked` tokens have `-INF` values and the others have `0` values. It is a tensor with shape broadcasted to `[batch_size, num_attention_heads, sequence_length, sequence_length]`. For example, its shape can be [batch_size, sequence_length], [batch_size, sequence_length, sequence_length], [batch_size, num_attention_heads, sequence_length, sequence_length]. We use whole-word-mask in ERNIE, so the whole word will have the same value. For example, "使用" as a word, "使" and "用" will have the same value. Defaults to `None`, which means nothing needed to be prevented attention to. attn_bias(Tensor, optional): 3D version of `input_mask`, if set, overrides `input_mask`; if set not False, attention mask willed not be applied. past_cache(Tensor, optional, tuple of two lists: cached key and cached value, Each is a list of `Variable`s of shape `[batch_size, seq_len, hidden_size]`: cached key/value tensor that will be concated to generated key/value when performing self attention. if set, `attn_bias` should not be None. Returns: tuple: Returns tuple (`encoded`, `additional_info`). With the fields: - `encoded`(Tensor): The output logits of transformer stack. It's data type should be float32 and its shape is [batch_size, sequence_length, hidden_size]. - `additional_info` (dict): Additional middle level info, inclues all hidden stats and k/v caches. """ assert len(src_ids.shape) == 2, "expect src_ids.shape = [batch, sequecen], got %s" % (repr(src_ids.shape)) assert ( attn_bias is not None if past_cache else True ), "if `past_cache` is specified; attn_bias should not be None" d_seqlen = paddle.shape(src_ids)[1] if pos_ids is None: pos_ids = paddle.arange(0, d_seqlen, 1, dtype="int32").reshape([1, -1]).cast("int64") if attn_bias is None: if input_mask is None: input_mask = paddle.cast(src_ids != 0, "float32") assert len(input_mask.shape) == 2 input_mask = input_mask.unsqueeze(-1) attn_bias = input_mask.matmul(input_mask, transpose_y=True) if use_causal_mask: sequence = paddle.reshape(paddle.arange(0, d_seqlen, 1, dtype="float32") + 1.0, [1, 1, -1, 1]) causal_mask = (sequence.matmul(1.0 / sequence, transpose_y=True) >= 1.0).cast("float32") attn_bias *= causal_mask else: assert len(attn_bias.shape) == 3, "expect attn_bias tobe rank 3, got %r" % attn_bias.shape attn_bias = (1.0 - attn_bias) * -10000.0 attn_bias = attn_bias.unsqueeze(1).tile([1, self.n_head, 1, 1]) # avoid broadcast =_= if sent_ids is None: sent_ids = paddle.zeros_like(src_ids) src_embedded = self.word_emb(src_ids) pos_embedded = self.pos_emb(pos_ids) sent_embedded = self.sent_emb(sent_ids) embedded = src_embedded + pos_embedded + sent_embedded embedded = self.dropout(self.ln(embedded)) encoded, hidden_list, cache_list = self.encoder_stack(embedded, attn_bias, past_cache=past_cache) additional_info = { "hiddens": hidden_list, "caches": cache_list, } return encoded, additional_info
[文档]class ErnieForGeneration(ErnieModel): """ Ernie Model for sequence to sequence generation. This model inherits from :class:`~paddlenlp.transformers.ernie.modeling.ErnieModel`. Refer to the superclass documentation for the generic methods. """ def __init__(self, cfg, name=None): super(ErnieForGeneration, self).__init__(cfg, name=name) initializer = nn.initializer.TruncatedNormal(std=cfg["initializer_range"]) d_model = cfg["hidden_size"] d_vocab = cfg["vocab_size"] self.mlm = _build_linear( d_model, d_model, append_name(name, "mask_lm_trans_fc"), initializer, ) self.act = getattr(paddle.nn.functional, cfg["hidden_act"]) self.mlm_ln = _build_ln(d_model, name=append_name(name, "mask_lm_trans")) self.mlm_bias = paddle.create_parameter( dtype="float32", shape=[d_vocab], attr=paddle.ParamAttr( name=append_name(name, "mask_lm_out_fc.b_0"), initializer=nn.initializer.Constant(value=0.0) ), is_bias=True, )
[文档] def forward(self, *args, **kwargs): """ Args: tgt_labels(Tensor, optional): The ground truth target sequence id (hard label) or distribution (soft label). It's data type should be `int64` and has a shape of [batch_size, sequence_length] or [batch_size, sequence_length, sequence_length]. tgt_pos(Tensor, optional): Index of tgt_labels in `src_ids`. It's data type should be `int64` and has a shape of [n_targets, 2]). encode_only(bool, optional): Whether the model will output the logits or only encode the inputs. If `encode_only` is `True`, `loss` and `logits_2d` will not be returned. Returns: tuple: Returns tuple (`None`, `None`, `info`) if `encode_only` is `True`, returns (`output_ids`, `logits`, `info`) if `tgt_labels` or `tgt_pos` is `None`, else, returns (`loss`, `logits_2d`, `info`). With the fields: - `info`(dict): Middle level info, includes all hidden stats and k/v caches. - `output_ids`(Tensor): The output index. Its data type should be float32 and its shape is [batch_size]. If `encode_only`, returns None. - `logits`(Tensor): Logits for every targets. Its data type should be float32 and its shape is [batch_size, sequence_length]. If `encode_only`, returns None. - `loss`(Tensor): Cross entropy loss mean over every target label. If `encode_only`, returns None. - `logits_2d`(Tensor): Logits for every targets if `tgt_labels` or `tgt_pos` is not `None` . Its data type should be float32 and its shape is [batch_size, sequence_length]. """ tgt_labels = kwargs.pop("tgt_labels", None) tgt_pos = kwargs.pop("tgt_pos", None) encode_only = kwargs.pop("encode_only", False) encoded, info = ErnieModel.forward(self, *args, **kwargs) if encode_only: return None, None, info if tgt_labels is None or tgt_pos is None: encoded = self.act(self.mlm(encoded)) encoded = self.mlm_ln(encoded) logits = encoded.matmul(self.word_emb.weight, transpose_y=True) + self.mlm_bias output_ids = logits.argmax(-1) return output_ids, logits, info else: encoded_2d = encoded.gather_nd(tgt_pos) encoded_2d = self.act(self.mlm(encoded_2d)) encoded_2d = self.mlm_ln(encoded_2d) logits_2d = encoded_2d.matmul(self.word_emb.weight, transpose_y=True) + self.mlm_bias if len(tgt_labels.shape) == 1: tgt_labels = paddle.reshape(tgt_labels, [-1, 1]) loss = F.cross_entropy(logits_2d, tgt_labels, reduction="none", soft_label=(tgt_labels.shape[-1] != 1)) return loss, logits_2d, info
ErnieGenModel = ErnieForGeneration