# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import warnings
import numpy as np
import paddle
from paddle.metric import Accuracy, Metric, Precision, Recall
__all__ = ["AccuracyAndF1", "Mcc", "PearsonAndSpearman", "MultiLabelsMetric"]
[文档]class AccuracyAndF1(Metric):
"""
This class encapsulates Accuracy, Precision, Recall and F1 metric logic,
and `accumulate` function returns accuracy, precision, recall and f1.
The overview of all metrics could be seen at the document of `paddle.metric
<https://www.paddlepaddle.org.cn/documentation/docs/zh/develop/api/paddle/metric/Overview_cn.html>`_
for details.
Args:
topk (int or tuple(int), optional):
Number of top elements to look at for computing accuracy.
Defaults to (1,).
pos_label (int, optional): The positive label for calculating precision
and recall.
Defaults to 1.
name (str, optional):
String name of the metric instance. Defaults to 'acc_and_f1'.
Example:
.. code-block::
import paddle
from paddlenlp.metrics import AccuracyAndF1
x = paddle.to_tensor([[0.1, 0.9], [0.5, 0.5], [0.6, 0.4], [0.7, 0.3]])
y = paddle.to_tensor([[1], [0], [1], [1]])
m = AccuracyAndF1()
correct = m.compute(x, y)
m.update(correct)
res = m.accumulate()
print(res) # (0.5, 0.5, 0.3333333333333333, 0.4, 0.45)
"""
def __init__(self, topk=(1,), pos_label=1, name="acc_and_f1", *args, **kwargs):
super(AccuracyAndF1, self).__init__(*args, **kwargs)
self.topk = topk
self.pos_label = pos_label
self._name = name
self.acc = Accuracy(self.topk, *args, **kwargs)
self.precision = Precision(*args, **kwargs)
self.recall = Recall(*args, **kwargs)
self.reset()
[文档] def compute(self, pred, label, *args):
"""
Accepts network's output and the labels, and calculates the top-k
(maximum value in topk) indices for accuracy.
Args:
pred (Tensor):
Predicted tensor, and its dtype is float32 or float64, and
has a shape of [batch_size, num_classes].
label (Tensor):
The ground truth tensor, and its dtype is is int64, and has a
shape of [batch_size, 1] or [batch_size, num_classes] in one
hot representation.
Returns:
Tensor: Correct mask, each element indicates whether the prediction
equals to the label. Its' a tensor with a data type of float32 and
has a shape of [batch_size, topk].
"""
self.label = label
self.preds_pos = paddle.nn.functional.softmax(pred)[:, self.pos_label]
return self.acc.compute(pred, label)
[文档] def update(self, correct, *args):
"""
Updates the metrics states (accuracy, precision and recall), in order to
calculate accumulated accuracy, precision and recall of all instances.
Args:
correct (Tensor):
Correct mask for calculating accuracy, and it's a tensor with
shape [batch_size, topk] and has a dtype of
float32.
"""
self.acc.update(correct)
self.precision.update(self.preds_pos, self.label)
self.recall.update(self.preds_pos, self.label)
[文档] def accumulate(self):
"""
Calculates and returns the accumulated metric.
Returns:
tuple: The accumulated metric. A tuple of shape (acc, precision,
recall, f1, average_of_acc_and_f1)
With the fields:
- `acc` (numpy.float64):
The accumulated accuracy.
- `precision` (numpy.float64):
The accumulated precision.
- `recall` (numpy.float64):
The accumulated recall.
- `f1` (numpy.float64):
The accumulated f1.
- `average_of_acc_and_f1` (numpy.float64):
The average of accumulated accuracy and f1.
"""
acc = self.acc.accumulate()
precision = self.precision.accumulate()
recall = self.recall.accumulate()
if precision == 0.0 or recall == 0.0:
f1 = 0.0
else:
# 1/f1 = 1/2 * (1/precision + 1/recall)
f1 = (2 * precision * recall) / (precision + recall)
return (
acc,
precision,
recall,
f1,
(acc + f1) / 2,
)
[文档] def reset(self):
"""
Resets all metric states.
"""
self.acc.reset()
self.precision.reset()
self.recall.reset()
self.label = None
self.preds_pos = None
[文档] def name(self):
"""
Returns name of the metric instance.
Returns:
str: The name of the metric instance.
"""
return self._name
[文档]class Mcc(Metric):
"""
This class calculates `Matthews correlation coefficient <https://en.wikipedia.org/wiki/Matthews_correlation_coefficient>`_ .
Args:
name (str, optional):
String name of the metric instance. Defaults to 'mcc'.
Example:
.. code-block::
import paddle
from paddlenlp.metrics import Mcc
x = paddle.to_tensor([[-0.1, 0.12], [-0.23, 0.23], [-0.32, 0.21], [-0.13, 0.23]])
y = paddle.to_tensor([[1], [0], [1], [1]])
m = Mcc()
(preds, label) = m.compute(x, y)
m.update((preds, label))
res = m.accumulate()
print(res) # (0.0,)
"""
def __init__(self, name="mcc", *args, **kwargs):
super(Mcc, self).__init__(*args, **kwargs)
self._name = name
self.tp = 0 # true positive
self.fp = 0 # false positive
self.tn = 0 # true negative
self.fn = 0 # false negative
[文档] def compute(self, pred, label, *args):
"""
Processes the pred tensor, and returns the indices of the maximum of each
sample.
Args:
pred (Tensor):
The predicted value is a Tensor with dtype float32 or float64.
Shape is [batch_size, 1].
label (Tensor):
The ground truth value is Tensor with dtype int64, and its
shape is [batch_size, 1].
Returns:
tuple: A tuple of preds and label. Each shape is
[batch_size, 1], with dtype float32 or float64.
"""
preds = paddle.argsort(pred, descending=True)[:, :1]
return (preds, label)
[文档] def update(self, preds_and_labels):
"""
Calculates states, i.e. the number of true positive, false positive,
true negative and false negative samples.
Args:
preds_and_labels (tuple[Tensor]):
Tuple of predicted value and the ground truth label, with dtype
float32 or float64. Each shape is [batch_size, 1].
"""
preds = preds_and_labels[0]
labels = preds_and_labels[1]
if isinstance(preds, paddle.Tensor):
preds = preds.numpy()
if isinstance(labels, paddle.Tensor):
labels = labels.numpy().reshape(-1, 1)
sample_num = labels.shape[0]
for i in range(sample_num):
pred = preds[i]
label = labels[i]
if pred == 1:
if pred == label:
self.tp += 1
else:
self.fp += 1
else:
if pred == label:
self.tn += 1
else:
self.fn += 1
[文档] def accumulate(self):
"""
Calculates and returns the accumulated metric.
Returns:
tuple: Returns the accumulated metric, a tuple of shape (mcc,), `mcc` is the accumulated mcc and its data
type is float64.
"""
if self.tp == 0 or self.fp == 0 or self.tn == 0 or self.fn == 0:
mcc = 0.0
else:
# mcc = (tp*tn-fp*fn)/ sqrt(tp+fp)(tp+fn)(tn+fp)(tn+fn))
mcc = (self.tp * self.tn - self.fp * self.fn) / math.sqrt(
(self.tp + self.fp) * (self.tp + self.fn) * (self.tn + self.fp) * (self.tn + self.fn)
)
return (mcc,)
[文档] def reset(self):
"""
Resets all metric states.
"""
self.tp = 0 # true positive
self.fp = 0 # false positive
self.tn = 0 # true negative
self.fn = 0 # false negative
[文档] def name(self):
"""
Returns name of the metric instance.
Returns:
str: The name of the metric instance.
"""
return self._name
[文档]class PearsonAndSpearman(Metric):
"""
The class calculates `Pearson correlation coefficient <https://en.wikipedia.org/wiki/Pearson_correlation_coefficient>`_
and `Spearman's rank correlation coefficient <https://en.wikipedia.org/wiki/Spearman%27s_rank_correlation_coefficient>`_ .
Args:
name (str, optional):
String name of the metric instance. Defaults to 'pearson_and_spearman'.
Example:
.. code-block::
import paddle
from paddlenlp.metrics import PearsonAndSpearman
x = paddle.to_tensor([[0.1], [1.0], [2.4], [0.9]])
y = paddle.to_tensor([[0.0], [1.0], [2.9], [1.0]])
m = PearsonAndSpearman()
m.update((x, y))
res = m.accumulate()
print(res) # (0.9985229081857804, 1.0, 0.9992614540928901)
"""
def __init__(self, name="pearson_and_spearman", *args, **kwargs):
super(PearsonAndSpearman, self).__init__(*args, **kwargs)
self._name = name
self.preds = []
self.labels = []
[文档] def update(self, preds_and_labels):
"""
Ensures the type of preds and labels is numpy.ndarray and reshapes them
into [-1, 1].
Args:
preds_and_labels (tuple[Tensor] or list[Tensor]):
Tuple or list of predicted value and the ground truth label.
Its data type should be float32 or float64 and its shape is [batch_size, d0, ..., dN].
"""
preds = preds_and_labels[0]
labels = preds_and_labels[1]
if isinstance(preds, paddle.Tensor):
preds = preds.numpy()
if isinstance(labels, paddle.Tensor):
labels = labels.numpy()
preds = np.squeeze(preds.reshape(-1, 1)).tolist()
labels = np.squeeze(labels.reshape(-1, 1)).tolist()
self.preds.append(preds)
self.labels.append(labels)
[文档] def accumulate(self):
"""
Calculates and returns the accumulated metric.
Returns:
tuple: Returns the accumulated metric, a tuple of (pearson, spearman,
the_average_of_pearson_and_spearman).
With the fields:
- `pearson` (numpy.float64):
The accumulated pearson.
- `spearman` (numpy.float64):
The accumulated spearman.
- `the_average_of_pearson_and_spearman` (numpy.float64):
The average of accumulated pearson and spearman correlation
coefficient.
"""
preds = [item for sublist in self.preds for item in sublist]
labels = [item for sublist in self.labels for item in sublist]
pearson = self.pearson(preds, labels)
spearman = self.spearman(preds, labels)
return (
pearson,
spearman,
(pearson + spearman) / 2,
)
def pearson(self, preds, labels):
n = len(preds)
# simple sums
sum1 = sum(float(preds[i]) for i in range(n))
sum2 = sum(float(labels[i]) for i in range(n))
# sum up the squares
sum1_pow = sum([pow(v, 2.0) for v in preds])
sum2_pow = sum([pow(v, 2.0) for v in labels])
# sum up the products
p_sum = sum([preds[i] * labels[i] for i in range(n)])
numerator = p_sum - (sum1 * sum2 / n)
denominator = math.sqrt((sum1_pow - pow(sum1, 2) / n) * (sum2_pow - pow(sum2, 2) / n))
if denominator == 0:
return 0.0
return numerator / denominator
def spearman(self, preds, labels):
preds_rank = self.get_rank(preds)
labels_rank = self.get_rank(labels)
total = 0
n = len(preds)
for i in range(n):
total += pow((preds_rank[i] - labels_rank[i]), 2)
spearman = 1 - float(6 * total) / (n * (pow(n, 2) - 1))
return spearman
def get_rank(self, raw_list):
x = np.array(raw_list)
r_x = np.empty(x.shape, dtype=int)
y = np.argsort(-x)
for i, k in enumerate(y):
r_x[k] = i + 1
return r_x
[文档] def reset(self):
"""
Resets all metric states.
"""
self.preds = []
self.labels = []
[文档] def name(self):
"""
Returns name of the metric instance.
Returns:
str: The name of the metric instance.
"""
return self._name
[文档]class MultiLabelsMetric(Metric):
"""
This class encapsulates Accuracy, Precision, Recall and F1 metric logic in
multi-labels setting (also the binary setting).
Some codes are taken and modified from sklearn.metrics .
Args:
num_labels (int)
The total number of labels which is usually the number of classes
name (str, optional):
String name of the metric instance. Defaults to 'multi_labels_metric'.
Example:
.. code-block::
import paddle
from paddlenlp.metrics import MultiLabelsMetric
x = paddle.to_tensor([[0.1, 0.2, 0.9], [0.5, 0.8, 0.5], [0.6, 1.5, 0.4], [2.8, 0.7, 0.3]])
y = paddle.to_tensor([[2], [1], [2], [1]])
m = MultiLabelsMetric(num_labels=3)
args = m.compute(x, y)
m.update(args)
result1 = m.accumulate(average=None)
# (array([0.0, 0.5, 1.0]), array([0.0, 0.5, 0.5]), array([0.0, 0.5, 0.66666667]))
result2 = m.accumulate(average='binary', pos_label=0)
# (0.0, 0.0, 0.0)
result3 = m.accumulate(average='binary', pos_label=1)
# (0.5, 0.5, 0.5)
result4 = m.accumulate(average='binary', pos_label=2)
# (1.0, 0.5, 0.6666666666666666)
result5 = m.accumulate(average='micro')
# (0.5, 0.5, 0.5)
result6 = m.accumulate(average='macro')
# (0.5, 0.3333333333333333, 0.38888888888888884)
result7 = m.accumulate(average='weighted')
# (0.75, 0.5, 0.5833333333333333)
Note: When zero_division is encountered (details as followed), the corresponding metrics will be set to 0.0
precision is zero_division if there are no positive predictions
recall is zero_division if there are no positive labels
fscore is zero_division if all labels AND predictions are negative
"""
def __init__(self, num_labels, name="multi_labels_metric"):
super(MultiLabelsMetric, self).__init__()
if num_labels <= 1:
raise ValueError(f"The num_labels is {num_labels}, which must be greater than 1.")
self.num_labels = num_labels
self._name = name
self._confusion_matrix = np.zeros((num_labels, 2, 2), dtype=int)
[文档] def update(self, args):
"""
Updates the metrics states (accuracy, precision and recall), in order to
calculate accumulated accuracy, precision and recall of all instances.
Args:
args (tuple of Tensor):
the tuple returned from `compute` function
"""
pred = args[0].numpy()
label = args[1].numpy()
tmp_confusion_matrix = self._multi_labels_confusion_matrix(pred, label)
self._confusion_matrix += tmp_confusion_matrix
[文档] def accumulate(self, average=None, pos_label=1):
"""
Calculates and returns the accumulated metric.
Args:
average (str in {‘binary’, ‘micro’, ‘macro’, ’weighted’} or None, optional):
Defaults to `None`. If `None`, the scores for each class are returned.
Otherwise, this determines the type of averaging performed on the data:
- `binary` :
Only report results for the class specified by pos_label.
- `micro` :
Calculate metrics globally by counting the total true positives,
false negatives and false positives.
- `macro` :
Calculate metrics for each label, and find their unweighted mean.
This does not take label imbalance into account.
- `weighted` :
Calculate metrics for each label, and find their average weighted
by support (the number of true instances for each label). This
alters `macro` to account for label imbalance; it can result in
an F-score that is not between precision and recall.
pos_label (int, optional):
The positive label for calculating precision and recall in binary settings.
Noted: Only when `average='binary'`, this arguments will be used. Otherwise,
it will be ignored.
Defaults to 1.
Returns:
tuple: The accumulated metric. A tuple of shape (precision, recall, f1)
With the fields:
- `precision` (numpy.float64 or numpy.ndarray if average=None):
The accumulated precision.
- `recall` (numpy.float64 or numpy.ndarray if average=None):
The accumulated recall.
- `f1` (numpy.float64 or numpy.ndarray if average=None):
The accumulated f1.
"""
if average not in {"binary", "micro", "macro", "weighted", None}:
raise ValueError(f"The average is {average}, which is unknown.")
if average == "binary":
if pos_label >= self.num_labels:
raise ValueError(
f"The pos_label is {pos_label}, num_labels is {self.num_labels}. "
f"The num_labels must be greater than pos_label."
)
confusion_matrix = None # [*, 2, 2]
if average == "binary":
confusion_matrix = np.expand_dims(self._confusion_matrix[pos_label], axis=0)
elif average == "micro":
confusion_matrix = self._confusion_matrix.sum(axis=0, keepdims=True)
# if average is 'macro' or 'weighted' or None
else:
confusion_matrix = self._confusion_matrix
tp = confusion_matrix[:, 1, 1] # [*,]
pred = tp + confusion_matrix[:, 0, 1] # [*,]
true = tp + confusion_matrix[:, 1, 0] # [*,]
def _robust_divide(numerator, denominator, metric_name):
mask = denominator == 0.0
denominator = denominator.copy()
denominator[mask] = 1 # avoid zero division
result = numerator / denominator
if not np.any(mask):
return result
# precision is zero_division if there are no positive predictions
# recall is zero_division if there are no positive labels
# fscore is zero_division if all labels AND predictions are negative
warnings.warn(f"Zero division when calculating {metric_name}.", UserWarning)
result[mask] = 0.0
return result
precision = _robust_divide(tp, pred, "precision")
recall = _robust_divide(tp, true, "recall")
f1 = _robust_divide(2 * (precision * recall), (precision + recall), "f1")
weights = None # [num_labels]
if average == "weighted":
weights = true
if weights.sum() == 0:
zero_division_value = np.float64(0.0)
if pred.sum() == 0:
return (zero_division_value, zero_division_value, zero_division_value)
else:
return (np.float64(0.0), zero_division_value, np.float64(0.0))
elif average == "macro":
weights = np.ones((self.num_labels), dtype=float)
if average is not None:
precision = np.average(precision, weights=weights)
recall = np.average(recall, weights=weights)
f1 = np.average(f1, weights=weights)
return precision, recall, f1
[文档] def compute(self, pred, label):
"""
Accepts network's output and the labels, and calculates the top-k
(maximum value in topk) indices for accuracy.
Args:
pred (Tensor):
Predicted tensor, and its dtype is float32 or float64, and
has a shape of [batch_size, *, num_labels].
label (Tensor):
The ground truth tensor, and its dtype is is int64, and has a
shape of [batch_size, *] or [batch_size, *, num_labels] in one
hot representation.
Returns:
tuple of Tensor: it contains two Tensor of shape [*, 1].
The tuple should be passed to `update` function.
"""
if not (paddle.is_tensor(pred) and paddle.is_tensor(label)):
raise ValueError("pred and label must be paddle tensor")
if pred.shape[-1] != self.num_labels:
raise ValueError(f"The last dim of pred is {pred.shape[-1]}, " f"which should be num_labels")
pred = paddle.reshape(pred, [-1, self.num_labels])
pred = paddle.argmax(pred, axis=-1)
if label.shape[-1] == self.num_labels:
label = paddle.reshape(label, [-1, self.num_labels])
label = paddle.argmax(label, axis=-1)
else:
label = paddle.reshape(label, [-1])
if paddle.max(label) >= self.num_labels:
raise ValueError(f"Tensor label has value {paddle.max(label)}, " f"which is no less than num_labels")
if pred.shape[0] != label.shape[0]:
raise ValueError("The length of pred is not equal to the length of label")
return pred, label
def _multi_labels_confusion_matrix(self, pred, label):
tp_bins = label[pred == label]
tp = np.bincount(tp_bins, minlength=self.num_labels) # [num_labels,]
tp_plus_fp = np.bincount(pred, minlength=self.num_labels) # [num_labels,]
tp_plus_fn = np.bincount(label, minlength=self.num_labels) # [num_labels,]
fp = tp_plus_fp - tp # [num_labels,]
fn = tp_plus_fn - tp # [num_labels,]
tn = pred.shape[0] - tp - fp - fn # [num_labels,]
return np.array([tn, fp, fn, tp]).T.reshape(-1, 2, 2) # [num_labels, 2, 2]
[文档] def reset(self):
self._confusion_matrix = np.zeros((self.num_labels, 2, 2), dtype=int)
[文档] def name(self):
"""
Returns name of the metric instance.
Returns:
str: The name of the metric instance.
"""
return self._name