Reputation: 1
I am trying to create a custom Loss function in PyTorch to fit better with my learning task on my GNN (I am using the framework PyTorch Geometric). Just an overview about my learning problem: I have an HeteroGraph (https://pytorch-geometric.readthedocs.io/en/latest/notes/heterogeneous.html ) with two different type of nodes (let’s call them A and B) and want to predict targets (continuous values) attached to links between nodes A and nodes B (knowing that I also have links between nodes A and nodes A, and links between nodes B and nodes B, they are used in the convolutional layers but I am not predicting them). I first used Mean Square Error and Mean Absolute Error as they are commonly used to evaluate performances on a regression task, and it ends up working well.
However, I want to try another approach where I am not so interested in the values themselves but rather in the order of their predicted values. I then decided to use metrics to compare rankings: from a specific node, take all the outgoing edges, predict the outputs, rank those predictions (highest values first), rank the ground truths and compare those two rankings - by ranking I mean the list of destination nodes when sorted by output/target values. To do so I decided to use the Rank-Biased Overlap metric (defined here: https://doi.org/10.1145/1852102.1852106) which has custom python packages - but I could also use any metric evaluating unlabelled ranking, such as Kendall’s Correlation.
I would like to use this metric as a loss to perform my learning task, but there are some challenges to surpass (sorry in advance if I’m confused about some technical aspects, I’m not used to diving into the deep learning methods):
I am not sure about how to implement this custom loss without breaking the gradients and make the backward pass unfeasible. If I understood correctly, to do so I should have my loss value directly computed from the original input tensors (targets and predicted outputs) only by tensor operations?
Below is the current state of my code. Returning two list of tensors in evaluate_nodes()
and initializing empty tensors to fill them one value by one in the forward pass seem pretty ugly but I'm not sure about how to deal with the previously referred issues.
Class RBOLoss
class RBOLoss(torch.nn.Module):
def __init__(self, reduction='none') -> None:
if reduction not in ['mean', 'sum', 'none']:
raise ValueError('RBO Loss: Reduction `{}` not implemented'.format(reduction))
self.reduction = reduction
super(RBOLoss, self).__init__()
"""
We expect to have, for each node, two arrays `prediction` and `target` of the same shape containing respectively the predicted output and the ground truth for each of the edges coming out of this node
"""
def calculate_rbo(self, prediction, target):
# Rank-Biased Overlap
gt_ranking = torch.argsort(target, descending=True)
pred_ranking = torch.argsort(prediction, descending=True)
# RBO package is not accepting tensors
return (rbo.RankingSimilarity(gt_ranking.numpy(), pred_ranking.numpy()).rbo(), gt_ranking.size(dim=0))
def forward(self, predictions, targets):
rbos, weights = torch.empty(len(predictions), dtype=float, requires_grad=True), torch.empty(len(targets), dtype=float, requires_grad=True)
for idx, (pred, gt) in enumerate(zip(predictions, targets)):
rbo, weight = self.calculate_rbo(pred, gt)
rbos[idx] = 1 - rbo # Computing loss, RBO=1 is perfect prediction
weights[idx] = weight
loss = torch.mul(rbos, weights)
if self.reduction == 'mean':
loss = torch.div(loss.sum(), weights.sum())
elif self.reduction == 'sum':
loss = loss.sum()
return loss
Code snippet in Model
def evaluate_nodes(self, x_dict, edge_index_dict, edge_label_index, targets):
# Compute output and give ground truths node by node - for ranking purposes
outputs, ground_truths = [], []
for node_idx in edge_label_index[0].unique():
sel = torch.where(edge_label_index[0] == node_idx)
outputs.append(self.forward(x_dict, edge_index_dict, edge_label_index[:, sel[0]]))
ground_truths.append(targets[sel[0]])
return outputs, ground_truths
Code snippet in the main pipeline (model.inspect(data) return all the edges of the link type I'm interested in)
def train_rbo(data, optimizer):
model.train()
target = model.inspect(data).y.float().view(-1)
predictions, ground_truths = model.evaluate_nodes(data.x_dict, data.edge_index_dict, model.inspect(data).edge_index, target)
criterion = RBOLoss(reduction='mean')
loss = criterion(predictions, ground_truths)
loss.backward()
optimizer.step()
return loss.item()
Upvotes: 0
Views: 846