import numpy as np
import torch
import torch_geometric
from torch import Tensor
from torch_geometric.data import Data
class BipartiteData(Data):
def __inc__(self, key, value, *args, **kwargs):
if key == "edge_index":
return Tensor(
self.x_s.size(0)], [self.x_t.size(0)]]
[[# source and target (two classes of bipartite graph)
) return super().__inc__(key, value, *args, **kwargs)
def is_bipartite(self):
return True
Apprentissage de réseaux bipartites avec pytorch-geometric
L’objectif est de proposer une représentation latente des nœuds d’un réseau bipartite (ou de plusieurs) dans un espace euclidien. Un sous-objectif peut être de prédire des dyades manquantes (non observées) que ce soient des arêtes ou des non-arêtes. Nous allons utiliser des “variational graph auto-encoder”, il est possible d’aller consulter les tutoriels correspondant sur la page de pytorch-geometrics et celui présenté lors d’un précédent atelier State of the R. Dans ce document, nous nous concentrons sur l’aspect sous-échantillonnage du réseau permettant de créer les jeux de données d’apprentissage et d’entraînement.
L’apprentissage de la représentation latente se fait sur l’optimisation d’une perte, à savoir l’entropie croisée, de la prédiction d’arêtes et de non arêtes (1 ou 0 dans la matrice d’incidence correspondant au réseau). Cet ensemble d’arêtes et de non arêtes choisies en proportions égales s’appelle le jeu d’entraînement et est considéré connu pour effectuer les convolutions au sein des couches de GNN (graph neural network). L’évaluation de la performance se fait sur un autre sous-ensemble d’arêtes et de non arêtes, appelé le jeu de test et est inconnu lors de l’entraînement et n’est pas utilisé pour l’encodage (succession de couches de GNN projetant les nœuds dans l’espace latent). Un autre jeu de données du même type peut être créé comme jeu de validation.
Pour ce faire la fonction RandomLinkSplit(...)
de pytorch-geometric
permet de faire cette séparation en trois jeux de données (ensemble d’arêtes et non arêtes) d’un objet de type BipartiteData
qui n’est pas une classe nativement définie dans pytorch-geometric
mais c’est la recommandation trouvée en ligne pour définir ce genre de réseaux.
On définit ensuite un réseau bipartite à partir d’un réseau généré aléatoirement :
# Generate a random bipartite graph
= (10, 30) # 100 nodes in each partition
num_nodes = 50 # Total number of edges
num_edges = generate_random_graph(num_nodes, num_edges)
edge_index # Convert to PyTorch tensor
= torch.tensor(edge_index, dtype=torch.long)
edge_index # Create a BipartiteData object
= BipartiteData(
random_network ="Random Graph",
name=0,
id_net=torch.arange(start=0,end=num_nodes[0],dtype=torch.float).reshape(-1, 1),
x_s=torch.arange(start=0,end=num_nodes[1],dtype=torch.float).reshape(-1, 1),
x_t=edge_index,
edge_index=len(edge_index[0].unique())+len(edge_index[1].unique()),
num_nodes
)print(random_network)
BipartiteData(edge_index=[2, 50], name='Random Graph', id_net=0, x_s=[10, 1], x_t=[30, 1], num_nodes=36)
où
name
etid_net
permettent de renseigner un nom et un identifiant au réseau (utile particulièrement si on a plusieurs réseaux),x_s
etx_t
donnent les matrices de covariables sur les nœuds en ligne et en colonne respectivement,edge_index
contient une matrice à 2 lignes et dont le nombre de colonnes correspond au nombre d’arêtes,num_nodes
est le nombre de nœuds dans le réseau mais cela posera des problèmes comme nous le verrons.
À présent, nous allons pouvoir proposer un sous-échantillonnage en jeux d’entraînement, de validation et de test.
import torch_geometric.transforms as T
= T.RandomLinkSplit(
transform =0.2,
num_test=0.1,
num_val=True
split_labels
)
= transform(random_network) train_data, val_data, test_data
où
num_test
permet de choisir la proportion de données dans le jeu de test,num_val
permet de choisir la proportion de données dans le jeu de validation,split_labels
fixé à vrai permet de séparer en deux parties les arêtes et les non arêtes.
Si on regarde ce que contiennent les objets train_data, val_data et test_data, on a :
print(train_data)
print(val_data)
print(test_data)
BipartiteData(edge_index=[2, 35], name='Random Graph', id_net=0, x_s=[10, 1], x_t=[30, 1], num_nodes=36, pos_edge_label=[35], pos_edge_label_index=[2, 35], neg_edge_label=[35], neg_edge_label_index=[2, 35])
BipartiteData(edge_index=[2, 35], name='Random Graph', id_net=0, x_s=[10, 1], x_t=[30, 1], num_nodes=36, pos_edge_label=[5], pos_edge_label_index=[2, 5], neg_edge_label=[5], neg_edge_label_index=[2, 5])
BipartiteData(edge_index=[2, 40], name='Random Graph', id_net=0, x_s=[10, 1], x_t=[30, 1], num_nodes=36, pos_edge_label=[10], pos_edge_label_index=[2, 10], neg_edge_label=[10], neg_edge_label_index=[2, 10])
Chaque objet contient edge_index
qui est le même entre train_data
et val_data
et qui contient les arêtes considérées comme connues et qui servent à faire les convolutions dans les GNN pour l’entraînement et la validation. Pour le test, on ajoute les arêtes du jeu validation au jeu d’entraînement pour faire les convolutions.
L’attribut pos_edge_label_index donne des indices d’arêtes (sous-ensemble des arêtes existantes dans le réseau) et l’attribut neg_edge_label_index donne des indices de non arêtes.
Si on regarde
print(test_data.pos_edge_label_index)
print(test_data.neg_edge_label_index)
tensor([[ 7, 9, 5, 9, 4, 8, 4, 0, 0, 4],
[20, 18, 24, 5, 2, 3, 14, 5, 26, 28]])
tensor([[ 3, 12, 11, 7, 20, 15, 26, 10, 33, 4],
[ 4, 27, 15, 35, 17, 28, 34, 12, 13, 5]])
on s’aperçoit d’une incohérence sur les non-arêtes car nous obtenons des indices de nœuds strictement supérieurs aux nombres de nœuds en ligne ou en colonne… Cela vient du fait qu’il fait l’échantillonnage des non-arêtes sur la conversion en matrice d’adjacence du réseau bipartite, c’est-à-dire sur une matrice de taille la somme des nombres de nœuds en ligne et en colonne ayant des blocs diagonaux vides (car il n’y a pas d’interaction au sein de chaque type de nœuds). Il faut donc utiliser “correctement” la fonction permettant d’échantillonner des non-arêtes au sein de la matrice d’incidence ce que ne fait pas la fonction RandomLinkSplit
pour des graphes bipartites…
from torch_geometric.utils import negative_sampling
= negative_sampling(
neg_edge_index =train_data.edge_index,
edge_index=(train_data.x_s.size(0), train_data.x_t.size(0)),
num_nodes=train_data.pos_edge_label_index.size(1),
num_neg_samples
)print(neg_edge_index)
tensor([[ 2, 3, 5, 9, 3, 3, 9, 8, 8, 5, 9, 7, 9, 3, 9, 3, 5, 3,
3, 9, 9, 7, 4, 7, 0, 2, 9, 7, 5, 3, 8, 7, 7, 8, 1],
[28, 7, 8, 3, 11, 4, 12, 5, 16, 29, 17, 12, 24, 1, 10, 17, 9, 8,
18, 11, 21, 18, 15, 2, 25, 13, 23, 16, 28, 5, 27, 15, 3, 28, 4]])
Lors de l’optimisation de la perte sur le jeu d’entraînement, on utilise les mêmes arêtes que celles servant à la convolution et on ajoute des non arêtes (0 dans la matrice d’incidence) échantillonnées comme ci-dessus. Pour le calcul des métriques de validation, on fait de même avec les arêtes dans val_data
et en ajoutant des non-arêtes par rapport à celles utilisées lors de la convolution grâce à la fonction negative_sampling
.
À la fin de l’entraînement, on teste sur le jeu de test test_data
mais en utilisant les convolutions sur la réunions des arêtes contenues dans train_data
et val_data
. Les non arêtes du jeu de test doivent être choisies par rapport aux non arêtes du réseau complet.