```# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# Unless required by applicable law or agreed to in writing, software
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

import os
import sys
import math
import warnings
from functools import partial

import numpy as np
from paddle.metric import Metric, Accuracy, Precision, Recall

__all__ = ['AccuracyAndF1', 'Mcc', 'PearsonAndSpearman', 'MultiLabelsMetric']

[文档]class AccuracyAndF1(Metric):
"""
This class encapsulates Accuracy, Precision, Recall and F1 metric logic,
and `accumulate` function returns accuracy, precision, recall and f1.
The overview of all metrics could be seen at the document of `paddle.metric
for details.

Args:
topk (int or tuple(int), optional):
Number of top elements to look at for computing accuracy.
Defaults to (1,).
pos_label (int, optional): The positive label for calculating precision
and recall.
Defaults to 1.
name (str, optional):
String name of the metric instance. Defaults to 'acc_and_f1'.

Example:

.. code-block::

x = paddle.to_tensor([[0.1, 0.9], [0.5, 0.5], [0.6, 0.4], [0.7, 0.3]])
y = paddle.to_tensor([, , , ])

m = AccuracyAndF1()
correct = m.compute(x, y)
m.update(correct)
res = m.accumulate()
print(res) # (0.5, 0.5, 0.3333333333333333, 0.4, 0.45)

"""

def __init__(self,
topk=(1, ),
pos_label=1,
name='acc_and_f1',
*args,
**kwargs):
super(AccuracyAndF1, self).__init__(*args, **kwargs)
self.topk = topk
self.pos_label = pos_label
self._name = name
self.acc = Accuracy(self.topk, *args, **kwargs)
self.precision = Precision(*args, **kwargs)
self.recall = Recall(*args, **kwargs)
self.reset()

[文档]    def compute(self, pred, label, *args):
"""
Accepts network's output and the labels, and calculates the top-k
(maximum value in topk) indices for accuracy.

Args:
pred (Tensor):
Predicted tensor, and its dtype is float32 or float64, and
has a shape of [batch_size, num_classes].
label (Tensor):
The ground truth tensor, and its dtype is is int64, and has a
shape of [batch_size, 1] or [batch_size, num_classes] in one
hot representation.

Returns:
Tensor: Correct mask, each element indicates whether the prediction
equals to the label. Its' a tensor with a data type of float32 and
has a shape of [batch_size, topk].

"""
self.label = label
return self.acc.compute(pred, label)

[文档]    def update(self, correct, *args):
"""
Updates the metrics states (accuracy, precision and recall), in order to
calculate accumulated accuracy, precision and recall of all instances.

Args:
correct (Tensor):
Correct mask for calculating accuracy, and it's a tensor with
shape [batch_size, topk] and has a dtype of
float32.

"""
self.acc.update(correct)
self.precision.update(self.preds_pos, self.label)
self.recall.update(self.preds_pos, self.label)

[文档]    def accumulate(self):
"""
Calculates and returns the accumulated metric.

Returns:
tuple: The accumulated metric. A tuple of shape (acc, precision,
recall, f1, average_of_acc_and_f1)

With the fields:

- `acc` (numpy.float64):
The accumulated accuracy.
- `precision` (numpy.float64):
The accumulated precision.
- `recall` (numpy.float64):
The accumulated recall.
- `f1` (numpy.float64):
The accumulated f1.
- `average_of_acc_and_f1` (numpy.float64):
The average of accumulated accuracy and f1.

"""
acc = self.acc.accumulate()
precision = self.precision.accumulate()
recall = self.recall.accumulate()
if precision == 0.0 or recall == 0.0:
f1 = 0.0
else:
# 1/f1 = 1/2 * (1/precision + 1/recall)
f1 = (2 * precision * recall) / (precision + recall)
return (
acc,
precision,
recall,
f1,
(acc + f1) / 2, )

[文档]    def reset(self):
"""
Resets all metric states.
"""
self.acc.reset()
self.precision.reset()
self.recall.reset()
self.label = None
self.preds_pos = None

[文档]    def name(self):
"""
Returns name of the metric instance.

Returns:
str: The name of the metric instance.

"""
return self._name

[文档]class Mcc(Metric):
"""
This class calculates `Matthews correlation coefficient <https://en.wikipedia.org/wiki/Matthews_correlation_coefficient>`_ .

Args:
name (str, optional):
String name of the metric instance. Defaults to 'mcc'.

Example:

.. code-block::

x = paddle.to_tensor([[-0.1, 0.12], [-0.23, 0.23], [-0.32, 0.21], [-0.13, 0.23]])
y = paddle.to_tensor([, , , ])

m = Mcc()
(preds, label) = m.compute(x, y)
m.update((preds, label))
res = m.accumulate()
print(res) # (0.0,)

"""

def __init__(self, name='mcc', *args, **kwargs):
super(Mcc, self).__init__(*args, **kwargs)
self._name = name
self.tp = 0  # true positive
self.fp = 0  # false positive
self.tn = 0  # true negative
self.fn = 0  # false negative

[文档]    def compute(self, pred, label, *args):
"""
Processes the pred tensor, and returns the indices of the maximum of each
sample.

Args:
pred (Tensor):
The predicted value is a Tensor with dtype float32 or float64.
Shape is [batch_size, 1].
label (Tensor):
The ground truth value is Tensor with dtype int64, and its
shape is [batch_size, 1].

Returns:
tuple: A tuple of preds and label. Each shape is
[batch_size, 1], with dtype float32 or float64.

"""
return (preds, label)

[文档]    def update(self, preds_and_labels):
"""
Calculates states, i.e. the number of true positive, false positive,
true negative and false negative samples.

Args:
preds_and_labels (tuple[Tensor]):
Tuple of predicted value and the ground truth label, with dtype
float32 or float64. Each shape is [batch_size, 1].

"""
preds = preds_and_labels
labels = preds_and_labels
preds = preds.numpy()
labels = labels.numpy().reshape(-1, 1)
sample_num = labels.shape
for i in range(sample_num):
pred = preds[i]
label = labels[i]
if pred == 1:
if pred == label:
self.tp += 1
else:
self.fp += 1
else:
if pred == label:
self.tn += 1
else:
self.fn += 1

[文档]    def accumulate(self):
"""
Calculates and returns the accumulated metric.

Returns:
tuple: Returns the accumulated metric, a tuple of shape (mcc,), `mcc` is the accumulated mcc and its data
type is float64.

"""
if self.tp == 0 or self.fp == 0 or self.tn == 0 or self.fn == 0:
mcc = 0.0
else:
# mcc = (tp*tn-fp*fn)/ sqrt(tp+fp)(tp+fn)(tn+fp)(tn+fn))
mcc = (self.tp * self.tn - self.fp * self.fn) / math.sqrt(
(self.tp + self.fp) * (self.tp + self.fn) *
(self.tn + self.fp) * (self.tn + self.fn))
return (mcc, )

[文档]    def reset(self):
"""
Resets all metric states.
"""
self.tp = 0  # true positive
self.fp = 0  # false positive
self.tn = 0  # true negative
self.fn = 0  # false negative

[文档]    def name(self):
"""
Returns name of the metric instance.

Returns:
str: The name of the metric instance.

"""
return self._name

[文档]class PearsonAndSpearman(Metric):
"""
The class calculates `Pearson correlation coefficient <https://en.wikipedia.org/wiki/Pearson_correlation_coefficient>`_
and `Spearman's rank correlation coefficient <https://en.wikipedia.org/wiki/Spearman%27s_rank_correlation_coefficient>`_ .

Args:
name (str, optional):
String name of the metric instance. Defaults to 'pearson_and_spearman'.

Example:

.. code-block::

x = paddle.to_tensor([[0.1], [1.0], [2.4], [0.9]])
y = paddle.to_tensor([[0.0], [1.0], [2.9], [1.0]])

m = PearsonAndSpearman()
m.update((x, y))
res = m.accumulate()
print(res) # (0.9985229081857804, 1.0, 0.9992614540928901)

"""

def __init__(self, name='pearson_and_spearman', *args, **kwargs):
super(PearsonAndSpearman, self).__init__(*args, **kwargs)
self._name = name
self.preds = []
self.labels = []

[文档]    def update(self, preds_and_labels):
"""
Ensures the type of preds and labels is numpy.ndarray and reshapes them
into [-1, 1].

Args:
preds_and_labels (tuple[Tensor] or list[Tensor]):
Tuple or list of predicted value and the ground truth label.
Its data type should be float32 or float64 and its shape is [batch_size, d0, ..., dN].

"""
preds = preds_and_labels
labels = preds_and_labels
preds = preds.numpy()
labels = labels.numpy()
preds = np.squeeze(preds.reshape(-1, 1)).tolist()
labels = np.squeeze(labels.reshape(-1, 1)).tolist()
self.preds.append(preds)
self.labels.append(labels)

[文档]    def accumulate(self):
"""
Calculates and returns the accumulated metric.

Returns:
tuple: Returns the accumulated metric, a tuple of (pearson, spearman,
the_average_of_pearson_and_spearman).

With the fields:

- `pearson` (numpy.float64):
The accumulated pearson.

- `spearman` (numpy.float64):
The accumulated spearman.

- `the_average_of_pearson_and_spearman` (numpy.float64):
The average of accumulated pearson and spearman correlation
coefficient.

"""
preds = [item for sublist in self.preds for item in sublist]
labels = [item for sublist in self.labels for item in sublist]
pearson = self.pearson(preds, labels)
spearman = self.spearman(preds, labels)
return (
pearson,
spearman,
(pearson + spearman) / 2, )

def pearson(self, preds, labels):
n = len(preds)
# simple sums
sum1 = sum(float(preds[i]) for i in range(n))
sum2 = sum(float(labels[i]) for i in range(n))
# sum up the squares
sum1_pow = sum([pow(v, 2.0) for v in preds])
sum2_pow = sum([pow(v, 2.0) for v in labels])
# sum up the products
p_sum = sum([preds[i] * labels[i] for i in range(n)])

numerator = p_sum - (sum1 * sum2 / n)
denominator = math.sqrt(
(sum1_pow - pow(sum1, 2) / n) * (sum2_pow - pow(sum2, 2) / n))
if denominator == 0:
return 0.0
return numerator / denominator

def spearman(self, preds, labels):
preds_rank = self.get_rank(preds)
labels_rank = self.get_rank(labels)

total = 0
n = len(preds)
for i in range(n):
total += pow((preds_rank[i] - labels_rank[i]), 2)
spearman = 1 - float(6 * total) / (n * (pow(n, 2) - 1))
return spearman

def get_rank(self, raw_list):
x = np.array(raw_list)
r_x = np.empty(x.shape, dtype=int)
y = np.argsort(-x)
for i, k in enumerate(y):
r_x[k] = i + 1
return r_x

[文档]    def reset(self):
"""
Resets all metric states.
"""
self.preds = []
self.labels = []

[文档]    def name(self):
"""
Returns name of the metric instance.

Returns:
str: The name of the metric instance.

"""
return self._name

[文档]class MultiLabelsMetric(Metric):
"""
This class encapsulates Accuracy, Precision, Recall and F1 metric logic in
multi-labels setting (also the binary setting).
Some codes are taken and modified from sklearn.metrics .

Args:
num_labels (int)
The total number of labels which is usually the number of classes
name (str, optional):
String name of the metric instance. Defaults to 'multi_labels_metric'.

Example:

.. code-block::

x = paddle.to_tensor([[0.1, 0.2, 0.9], [0.5, 0.8, 0.5], [0.6, 1.5, 0.4], [2.8, 0.7, 0.3]])
y = paddle.to_tensor([, , , ])

m = MultiLabelsMetric(num_labels=3)
args = m.compute(x, y)
m.update(args)

result1 = m.accumulate(average=None)
# (array([0.0, 0.5, 1.0]), array([0.0, 0.5, 0.5]), array([0.0, 0.5, 0.66666667]))
result2 = m.accumulate(average='binary', pos_label=0)
# (0.0, 0.0, 0.0)
result3 = m.accumulate(average='binary', pos_label=1)
# (0.5, 0.5, 0.5)
result4 = m.accumulate(average='binary', pos_label=2)
# (1.0, 0.5, 0.6666666666666666)
result5 = m.accumulate(average='micro')
# (0.5, 0.5, 0.5)
result6 = m.accumulate(average='macro')
# (0.5, 0.3333333333333333, 0.38888888888888884)
result7 = m.accumulate(average='weighted')
# (0.75, 0.5, 0.5833333333333333)

Note: When zero_division is encountered (details as followed), the corresponding metrics will be set to 0.0
precision is zero_division if there are no positive predictions
recall is zero_division if there are no positive labels
fscore is zero_division if all labels AND predictions are negative
"""

def __init__(self, num_labels, name='multi_labels_metric'):
super(MultiLabelsMetric, self).__init__()
if num_labels <= 1:
raise ValueError(
f"The num_labels is {num_labels}, which must be greater than 1.")
self.num_labels = num_labels
self._name = name
self._confusion_matrix = np.zeros((num_labels, 2, 2), dtype=int)

[文档]    def update(self, args):
"""
Updates the metrics states (accuracy, precision and recall), in order to
calculate accumulated accuracy, precision and recall of all instances.

Args:
args (tuple of Tensor):
the tuple returned from `compute` function
"""
pred = args.numpy()
label = args.numpy()
tmp_confusion_matrix = self._multi_labels_confusion_matrix(pred, label)
self._confusion_matrix += tmp_confusion_matrix

[文档]    def accumulate(self, average=None, pos_label=1):
"""
Calculates and returns the accumulated metric.

Args:
average (str in {‘binary’, ‘micro’, ‘macro’, ’weighted’} or None, optional):
Defaults to `None`. If `None`, the scores for each class are returned.
Otherwise, this determines the type of averaging performed on the data:

- `binary` :
Only report results for the class specified by pos_label.

- `micro` :
Calculate metrics globally by counting the total true positives,
false negatives and false positives.

- `macro` :
Calculate metrics for each label, and find their unweighted mean.
This does not take label imbalance into account.

- `weighted` :
Calculate metrics for each label, and find their average weighted
by support (the number of true instances for each label). This
alters `macro` to account for label imbalance; it can result in
an F-score that is not between precision and recall.

pos_label (int, optional):
The positive label for calculating precision and recall in binary settings.
Noted: Only when `average='binary'`, this arguments will be used. Otherwise,
it will be ignored.
Defaults to 1.

Returns:
tuple: The accumulated metric. A tuple of shape (precision, recall, f1)
With the fields:

- `precision` (numpy.float64 or numpy.ndarray if average=None):
The accumulated precision.
- `recall` (numpy.float64 or numpy.ndarray if average=None):
The accumulated recall.
- `f1` (numpy.float64 or numpy.ndarray if average=None):
The accumulated f1.

"""
if average not in {'binary', 'micro', 'macro', 'weighted', None}:
raise ValueError(f"The average is {average}, which is unknown.")
if average == 'binary':
if pos_label >= self.num_labels:
raise ValueError(
f"The pos_label is {pos_label}, num_labels is {self.num_labels}. "
f"The num_labels must be greater than pos_label.")

confusion_matrix = None  # [*, 2, 2]
if average == 'binary':
confusion_matrix = np.expand_dims(
self._confusion_matrix[pos_label], axis=0)
elif average == 'micro':
confusion_matrix = self._confusion_matrix.sum(axis=0, keepdims=True)
#  if average is 'macro' or 'weighted' or None
else:
confusion_matrix = self._confusion_matrix

tp = confusion_matrix[:, 1, 1]  # [*,]
pred = tp + confusion_matrix[:, 0, 1]  # [*,]
true = tp + confusion_matrix[:, 1, 0]  # [*,]

def _robust_divide(numerator, denominator, metric_name):
denominator = denominator.copy()
denominator[mask] = 1  # avoid zero division
result = numerator / denominator

return result

# precision is zero_division if there are no positive predictions
# recall is zero_division if there are no positive labels
# fscore is zero_division if all labels AND predictions are negative
warnings.warn(f'Zero division when calculating {metric_name}.',
UserWarning)
return result

precision = _robust_divide(tp, pred, 'precision')
recall = _robust_divide(tp, true, 'recall')
f1 = _robust_divide(2 * (precision * recall), (precision + recall),
'f1')

weights = None  # [num_labels]
if average == 'weighted':
weights = true
if weights.sum() == 0:
zero_division_value = np.float64(0.0)
if pred.sum() == 0:
return (zero_division_value, zero_division_value,
zero_division_value)
else:
return (np.float64(0.0), zero_division_value,
np.float64(0.0))
elif average == 'macro':
weights = np.ones((self.num_labels), dtype=float)
if average is not None:
precision = np.average(precision, weights=weights)
recall = np.average(recall, weights=weights)
f1 = np.average(f1, weights=weights)

return precision, recall, f1

[文档]    def compute(self, pred, label):
"""
Accepts network's output and the labels, and calculates the top-k
(maximum value in topk) indices for accuracy.

Args:
pred (Tensor):
Predicted tensor, and its dtype is float32 or float64, and
has a shape of [batch_size, *, num_labels].
label (Tensor):
The ground truth tensor, and its dtype is is int64, and has a
shape of [batch_size, *] or [batch_size, *, num_labels] in one
hot representation.

Returns:
tuple of Tensor: it contains two Tensor of shape [*, 1].
The tuple should be passed to `update` function.
"""
raise ValueError('pred and label must be paddle tensor')

if pred.shape[-1] != self.num_labels:
raise ValueError(f'The last dim of pred is {pred.shape[-1]}, '
f'which should be num_labels')

if label.shape[-1] == self.num_labels:
else:
raise ValueError(f"Tensor label has value {paddle.max(label)}, "
f"which is no less than num_labels")

if pred.shape != label.shape:
raise ValueError(
f"The length of pred is not equal to the length of label")

return pred, label

def _multi_labels_confusion_matrix(self, pred, label):
tp_bins = label[pred == label]
tp = np.bincount(tp_bins, minlength=self.num_labels)  # [num_labels,]
tp_plus_fp = np.bincount(
pred, minlength=self.num_labels)  # [num_labels,]
tp_plus_fn = np.bincount(
label, minlength=self.num_labels)  # [num_labels,]
fp = tp_plus_fp - tp  # [num_labels,]
fn = tp_plus_fn - tp  # [num_labels,]
tn = pred.shape - tp - fp - fn  # [num_labels,]
return np.array([tn, fp, fn, tp]).T.reshape(-1, 2,
2)  # [num_labels, 2, 2]

[文档]    def reset(self):
self._confusion_matrix = np.zeros((self.num_labels, 2, 2), dtype=int)

[文档]    def name(self):
"""
Returns name of the metric instance.

Returns:
str: The name of the metric instance.

"""
return self._name
```