323 lines
12 KiB
Python
323 lines
12 KiB
Python
|
"""
|
||
|
A Metric observes output of certain model, for example, in form of logits or
|
||
|
scores, and accumulates a particular metric with reference to some provided
|
||
|
targets. In context of VisDial, we use Recall (@ 1, 5, 10), Mean Rank, Mean
|
||
|
Reciprocal Rank (MRR) and Normalized Discounted Cumulative Gain (NDCG).
|
||
|
|
||
|
Each ``Metric`` must atleast implement three methods:
|
||
|
- ``observe``, update accumulated metric with currently observed outputs
|
||
|
and targets.
|
||
|
- ``retrieve`` to return the accumulated metric., an optionally reset
|
||
|
internally accumulated metric (this is commonly done between two epochs
|
||
|
after validation).
|
||
|
- ``reset`` to explicitly reset the internally accumulated metric.
|
||
|
|
||
|
Caveat, if you wish to implement your own class of Metric, make sure you call
|
||
|
``detach`` on output tensors (like logits), else it will cause memory leaks.
|
||
|
"""
|
||
|
import torch
|
||
|
import torch.distributed as dist
|
||
|
import numpy as np
|
||
|
|
||
|
def scores_to_ranks(scores: torch.Tensor):
|
||
|
"""Convert model output scores into ranks."""
|
||
|
batch_size, num_rounds, num_options = scores.size()
|
||
|
scores = scores.view(-1, num_options)
|
||
|
|
||
|
# sort in descending order - largest score gets highest rank
|
||
|
sorted_ranks, ranked_idx = scores.sort(1, descending=True)
|
||
|
|
||
|
# i-th position in ranked_idx specifies which score shall take this
|
||
|
# position but we want i-th position to have rank of score at that
|
||
|
# position, do this conversion
|
||
|
ranks = ranked_idx.clone().fill_(0)
|
||
|
for i in range(ranked_idx.size(0)):
|
||
|
for j in range(num_options):
|
||
|
ranks[i][ranked_idx[i][j]] = j
|
||
|
# convert from 0-99 ranks to 1-100 ranks
|
||
|
ranks += 1
|
||
|
ranks = ranks.view(batch_size, num_rounds, num_options)
|
||
|
return ranks
|
||
|
|
||
|
class SparseGTMetrics(object):
|
||
|
"""
|
||
|
A class to accumulate all metrics with sparse ground truth annotations.
|
||
|
These include Recall (@ 1, 5, 10), Mean Rank and Mean Reciprocal Rank.
|
||
|
"""
|
||
|
|
||
|
def __init__(self):
|
||
|
self._rank_list = []
|
||
|
self._rank_list_rnd = []
|
||
|
self.num_rounds = None
|
||
|
|
||
|
def observe(
|
||
|
self, predicted_scores: torch.Tensor, target_ranks: torch.Tensor
|
||
|
):
|
||
|
predicted_scores = predicted_scores.detach()
|
||
|
|
||
|
# shape: (batch_size, num_rounds, num_options)
|
||
|
predicted_ranks = scores_to_ranks(predicted_scores)
|
||
|
batch_size, num_rounds, num_options = predicted_ranks.size()
|
||
|
self.num_rounds = num_rounds
|
||
|
# collapse batch dimension
|
||
|
predicted_ranks = predicted_ranks.view(
|
||
|
batch_size * num_rounds, num_options
|
||
|
)
|
||
|
|
||
|
# shape: (batch_size * num_rounds, )
|
||
|
target_ranks = target_ranks.view(batch_size * num_rounds).long()
|
||
|
|
||
|
# shape: (batch_size * num_rounds, )
|
||
|
predicted_gt_ranks = predicted_ranks[
|
||
|
torch.arange(batch_size * num_rounds), target_ranks
|
||
|
]
|
||
|
self._rank_list.extend(list(predicted_gt_ranks.cpu().numpy()))
|
||
|
|
||
|
predicted_gt_ranks_rnd = predicted_gt_ranks.view(batch_size, num_rounds)
|
||
|
# predicted gt ranks
|
||
|
self._rank_list_rnd.append(predicted_gt_ranks_rnd.cpu().numpy())
|
||
|
|
||
|
def retrieve(self, reset: bool = True):
|
||
|
num_examples = len(self._rank_list)
|
||
|
if num_examples > 0:
|
||
|
# convert to numpy array for easy calculation.
|
||
|
__rank_list = torch.tensor(self._rank_list).float()
|
||
|
metrics = {
|
||
|
"r@1": torch.mean((__rank_list <= 1).float()).item(),
|
||
|
"r@5": torch.mean((__rank_list <= 5).float()).item(),
|
||
|
"r@10": torch.mean((__rank_list <= 10).float()).item(),
|
||
|
"mean": torch.mean(__rank_list).item(),
|
||
|
"mrr": torch.mean(__rank_list.reciprocal()).item()
|
||
|
}
|
||
|
# add round metrics
|
||
|
_rank_list_rnd = np.concatenate(self._rank_list_rnd)
|
||
|
_rank_list_rnd = _rank_list_rnd.astype(float)
|
||
|
r_1_rnd = np.mean(_rank_list_rnd <= 1, axis=0)
|
||
|
r_5_rnd = np.mean(_rank_list_rnd <= 5, axis=0)
|
||
|
r_10_rnd = np.mean(_rank_list_rnd <= 10, axis=0)
|
||
|
mean_rnd = np.mean(_rank_list_rnd, axis=0)
|
||
|
mrr_rnd = np.mean(np.reciprocal(_rank_list_rnd), axis=0)
|
||
|
|
||
|
for rnd in range(1, self.num_rounds + 1):
|
||
|
metrics["r_1" + "_round_" + str(rnd)] = r_1_rnd[rnd-1]
|
||
|
metrics["r_5" + "_round_" + str(rnd)] = r_5_rnd[rnd-1]
|
||
|
metrics["r_10" + "_round_" + str(rnd)] = r_10_rnd[rnd-1]
|
||
|
metrics["mean" + "_round_" + str(rnd)] = mean_rnd[rnd-1]
|
||
|
metrics["mrr" + "_round_" + str(rnd)] = mrr_rnd[rnd-1]
|
||
|
else:
|
||
|
metrics = {}
|
||
|
|
||
|
if reset:
|
||
|
self.reset()
|
||
|
return metrics
|
||
|
|
||
|
def reset(self):
|
||
|
self._rank_list = []
|
||
|
self._rank_list_rnd = []
|
||
|
|
||
|
class NDCG(object):
|
||
|
def __init__(self):
|
||
|
self._ndcg_numerator = 0.0
|
||
|
self._ndcg_denominator = 0.0
|
||
|
|
||
|
def observe(
|
||
|
self, predicted_scores: torch.Tensor, target_relevance: torch.Tensor
|
||
|
):
|
||
|
"""
|
||
|
Observe model output scores and target ground truth relevance and
|
||
|
accumulate NDCG metric.
|
||
|
|
||
|
Parameters
|
||
|
----------
|
||
|
predicted_scores: torch.Tensor
|
||
|
A tensor of shape (batch_size, num_options), because dense
|
||
|
annotations are available for 1 randomly picked round out of 10.
|
||
|
target_relevance: torch.Tensor
|
||
|
A tensor of shape same as predicted scores, indicating ground truth
|
||
|
relevance of each answer option for a particular round.
|
||
|
"""
|
||
|
predicted_scores = predicted_scores.detach()
|
||
|
|
||
|
# shape: (batch_size, 1, num_options)
|
||
|
predicted_scores = predicted_scores.unsqueeze(1)
|
||
|
predicted_ranks = scores_to_ranks(predicted_scores)
|
||
|
|
||
|
# shape: (batch_size, num_options)
|
||
|
predicted_ranks = predicted_ranks.squeeze(1)
|
||
|
batch_size, num_options = predicted_ranks.size()
|
||
|
|
||
|
k = torch.sum(target_relevance != 0, dim=-1)
|
||
|
|
||
|
# shape: (batch_size, num_options)
|
||
|
_, rankings = torch.sort(predicted_ranks, dim=-1)
|
||
|
# Sort relevance in descending order so highest relevance gets top rnk.
|
||
|
_, best_rankings = torch.sort(
|
||
|
target_relevance, dim=-1, descending=True
|
||
|
)
|
||
|
|
||
|
# shape: (batch_size, )
|
||
|
batch_ndcg = []
|
||
|
for batch_index in range(batch_size):
|
||
|
num_relevant = k[batch_index]
|
||
|
dcg = self._dcg(
|
||
|
rankings[batch_index][:num_relevant],
|
||
|
target_relevance[batch_index],
|
||
|
)
|
||
|
best_dcg = self._dcg(
|
||
|
best_rankings[batch_index][:num_relevant],
|
||
|
target_relevance[batch_index],
|
||
|
)
|
||
|
batch_ndcg.append(dcg / best_dcg)
|
||
|
|
||
|
self._ndcg_denominator += batch_size
|
||
|
self._ndcg_numerator += sum(batch_ndcg)
|
||
|
|
||
|
def _dcg(self, rankings: torch.Tensor, relevance: torch.Tensor):
|
||
|
sorted_relevance = relevance[rankings].cpu().float()
|
||
|
discounts = torch.log2(torch.arange(len(rankings)).float() + 2)
|
||
|
return torch.sum(sorted_relevance / discounts, dim=-1)
|
||
|
|
||
|
def retrieve(self, reset: bool = True):
|
||
|
if self._ndcg_denominator > 0:
|
||
|
metrics = {
|
||
|
"ndcg": float(self._ndcg_numerator / self._ndcg_denominator)
|
||
|
}
|
||
|
else:
|
||
|
metrics = {}
|
||
|
|
||
|
if reset:
|
||
|
self.reset()
|
||
|
return metrics
|
||
|
|
||
|
def reset(self):
|
||
|
self._ndcg_numerator = 0.0
|
||
|
self._ndcg_denominator = 0.0
|
||
|
|
||
|
class SparseGTMetricsParallel(object):
|
||
|
"""
|
||
|
A class to accumulate all metrics with sparse ground truth annotations.
|
||
|
These include Recall (@ 1, 5, 10), Mean Rank and Mean Reciprocal Rank.
|
||
|
"""
|
||
|
|
||
|
def __init__(self, gpu_rank):
|
||
|
self.rank_1 = 0
|
||
|
self.rank_5 = 0
|
||
|
self.rank_10 = 0
|
||
|
self.ranks = 0
|
||
|
self.reciprocal = 0
|
||
|
self.count = 0
|
||
|
self.gpu_rank = gpu_rank
|
||
|
self.img_ids = []
|
||
|
|
||
|
def observe(
|
||
|
self, img_id: list, predicted_scores: torch.Tensor, target_ranks: torch.Tensor
|
||
|
):
|
||
|
if img_id in self.img_ids:
|
||
|
return
|
||
|
else:
|
||
|
self.img_ids.append(img_id)
|
||
|
|
||
|
predicted_scores = predicted_scores.detach()
|
||
|
|
||
|
# shape: (batch_size, num_rounds, num_options)
|
||
|
predicted_ranks = scores_to_ranks(predicted_scores)
|
||
|
batch_size, num_rounds, num_options = predicted_ranks.size()
|
||
|
self.num_rounds = num_rounds
|
||
|
# collapse batch dimension
|
||
|
predicted_ranks = predicted_ranks.view(
|
||
|
batch_size * num_rounds, num_options
|
||
|
)
|
||
|
|
||
|
# shape: (batch_size * num_rounds, )
|
||
|
target_ranks = target_ranks.view(batch_size * num_rounds).long()
|
||
|
|
||
|
# shape: (batch_size * num_rounds, )
|
||
|
predicted_gt_ranks = predicted_ranks[
|
||
|
torch.arange(batch_size * num_rounds), target_ranks
|
||
|
]
|
||
|
|
||
|
self.rank_1 += (predicted_gt_ranks <= 1).sum().item()
|
||
|
self.rank_5 += (predicted_gt_ranks <= 5).sum().item()
|
||
|
self.rank_10 += (predicted_gt_ranks <= 10).sum().item()
|
||
|
self.ranks += predicted_gt_ranks.sum().item()
|
||
|
self.reciprocal += predicted_gt_ranks.float().reciprocal().sum().item()
|
||
|
self.count += batch_size * num_rounds
|
||
|
|
||
|
def retrieve(self):
|
||
|
if self.count > 0:
|
||
|
# retrieve data from all gpu
|
||
|
# define tensor on GPU, count and total is the result at each GPU
|
||
|
t = torch.tensor([self.rank_1, self.rank_5, self.rank_10, self.ranks, self.reciprocal, self.count], dtype=torch.float32, device=f'cuda:{self.gpu_rank}')
|
||
|
dist.barrier() # synchronizes all processes
|
||
|
dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result.
|
||
|
t = t.tolist()
|
||
|
self.rank_1, self.rank_5, self.rank_10, self.ranks, self.reciprocal, self.count = t
|
||
|
|
||
|
# convert to numpy array for easy calculation.
|
||
|
metrics = {
|
||
|
"r@1": self.rank_1 / self.count,
|
||
|
"r@5": self.rank_5 / self.count,
|
||
|
"r@10": self.rank_10 / self.count,
|
||
|
"mean": self.ranks / self.count,
|
||
|
"mrr": self.reciprocal / self.count,
|
||
|
"tot_rnds": self.count,
|
||
|
}
|
||
|
|
||
|
else:
|
||
|
metrics = {}
|
||
|
|
||
|
return metrics
|
||
|
|
||
|
def get_count(self):
|
||
|
return int(self.count)
|
||
|
|
||
|
class NDCGParallel(NDCG):
|
||
|
def __init__(self, gpu_rank):
|
||
|
super(NDCGParallel, self).__init__()
|
||
|
self.gpu_rank = gpu_rank
|
||
|
self.img_ids = []
|
||
|
self.count = 0
|
||
|
|
||
|
def observe(
|
||
|
self, img_id: int, predicted_scores: torch.Tensor, target_relevance: torch.Tensor
|
||
|
):
|
||
|
"""
|
||
|
Observe model output scores and target ground truth relevance and
|
||
|
accumulate NDCG metric.
|
||
|
|
||
|
Parameters
|
||
|
----------
|
||
|
predicted_scores: torch.Tensor
|
||
|
A tensor of shape (batch_size, num_options), because dense
|
||
|
annotations are available for 1 randomly picked round out of 10.
|
||
|
target_relevance: torch.Tensor
|
||
|
A tensor of shape same as predicted scores, indicating ground truth
|
||
|
relevance of each answer option for a particular round.
|
||
|
"""
|
||
|
if img_id in self.img_ids:
|
||
|
return
|
||
|
else:
|
||
|
self.img_ids.append(img_id)
|
||
|
self.count += 1
|
||
|
|
||
|
super(NDCGParallel, self).observe(predicted_scores, target_relevance)
|
||
|
|
||
|
|
||
|
def retrieve(self):
|
||
|
if self._ndcg_denominator > 0:
|
||
|
# define tensor on GPU, count and total is the result at each GPU
|
||
|
t = torch.tensor([self._ndcg_numerator, self._ndcg_denominator, self.count], dtype=torch.float32, device=f'cuda:{self.gpu_rank}')
|
||
|
dist.barrier() # synchronizes all processes
|
||
|
dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result.
|
||
|
t = t.tolist()
|
||
|
self._ndcg_numerator, self._ndcg_denominator, self.count = t
|
||
|
metrics = {
|
||
|
"ndcg": float(self._ndcg_numerator / self._ndcg_denominator)
|
||
|
}
|
||
|
else:
|
||
|
metrics = {}
|
||
|
return metrics
|
||
|
|
||
|
def get_count(self):
|
||
|
return int(self.count)
|