import abc
from typing import List, Union
import hdbscan
import numpy as np
import pandas as pd
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score
from linchemin import settings
from linchemin.cgu.syngraph import BipartiteSynGraph, MonopartiteReacSynGraph
from linchemin.rem.graph_distance import compute_distance_matrix
from linchemin.rem.route_descriptors import descriptor_calculator
from linchemin.utilities import console_logger
"""
Module containing classes and functions to compute
the clustering of routes based on the distance matrix.
"""
logger = console_logger(__name__)
class ClusteringError(Exception):
"""Base class for exceptions leading to unsuccessful clustering."""
pass
class OnlyNoiseClustering(ClusteringError):
"""Raised if only noise was found while clustering"""
pass
class NoClustering(ClusteringError):
"""Raised if the clustering was not successful"""
pass
class UnavailableClusteringAlgorithm(ClusteringError):
"""Raised if the selected clustering algorithm is not among the available ones"""
pass
class SingleRouteClustering(ClusteringError):
pass
[docs]
class ClusterCalculator(metaclass=abc.ABCMeta):
"""Definition of the abstract class for ClusterCalculator"""
[docs]
@abc.abstractmethod
def get_clustering(
self, dist_matrix: pd.DataFrame, save_dist_matrix: bool, **kwargs
) -> tuple:
"""
To apply the clustering algorithm to the provided distance matrix
Parameters:
------------
dist_matrix:pd.DataFrame
The symmetric distance matrix for the routes
save_dist_matrix: bool
Whether the distance matrix should be returned as an output
kwargs: possible additional arguments specific for each clustering algorithm
"""
pass
class HdbscanClusterCalculator(ClusterCalculator):
"""Subclass of ClusterCalculator to apply the Hdbscan algorithm"""
def get_clustering(self, dist_matrix, save_dist_matrix, **kwargs):
"""Applies the Hdbscan algorithm.
Possible optional arguments: min_cluster_size"""
min_cluster_size = kwargs.get(
"min_cluster_size", settings.CLUSTERING.min_cluster_size
)
clustering = hdbscan.HDBSCAN(
min_cluster_size=min_cluster_size, metric="precomputed"
).fit(dist_matrix.to_numpy(dtype=float))
if clustering is None:
logger.error("The clustering algorithm did not return any result.")
raise NoClustering
if 0 not in clustering.labels_:
# hdbscan: with less than 15 datapoints, only noise is found
logger.error(
"Hdbscan found only noise. "
"This can occur if less than 15 routes were given"
)
raise OnlyNoiseClustering
s_score = compute_silhouette_score(dist_matrix, clustering.labels_)
print(f"The Silhouette score is {round(s_score, 3):.3f}")
return (
(clustering, s_score, dist_matrix)
if save_dist_matrix is True
else (clustering, s_score)
)
class AgglomerativeClusterCalculator(ClusterCalculator):
"""Subclass of ClusterCalculator to apply the Agglomerative Clustering algorithm"""
def get_clustering(self, dist_matrix, save_dist_matrix, **kwargs):
"""Applies the Agglomerative Clustering algorithm.
Possible optional arguments: linkage"""
linkage = kwargs.get("linkage", settings.CLUSTERING.linkage)
clustering, s_score, best_n_cluster = optimize_agglomerative_cluster(
dist_matrix, linkage
)
if clustering is None:
logger.error("The clustering algorithm did not return any result.")
raise NoClustering
if 0 not in clustering.labels_:
logger.error("The algorithm found only noise.")
raise OnlyNoiseClustering
print(
f"The number of clusters with the best Silhouette score is {best_n_cluster}"
)
print(f"The Silhouette score is {round(s_score, 3):.3f}")
return (
(clustering, s_score, dist_matrix)
if save_dist_matrix is True
else (clustering, s_score)
)
[docs]
class ClusterFactory:
"""Definition of the Cluster Factory to give access to the clustering algorithms.
Attributes:
------------
available_clustering_algorithms: : a dictionary
It maps the strings representing the 'name' of a clustering algorithm to the correct
ClusterCalculator subclass
""" # noqa: E501
available_clustering_algorithms = {
"hdbscan": {
"value": HdbscanClusterCalculator,
"info": "HDBscan algorithm. Not working with less than 15 routes",
},
"agglomerative_cluster": {
"value": AgglomerativeClusterCalculator,
"info": "Agglomerative Clustering algorithm. "
"The number of clusters is optimized "
"computing the silhouette score",
},
}
def select_clustering_algorithms(
self,
syngraphs: list,
ged_method: str,
clustering_method: str,
ged_params=None,
save_dist_matrix=False,
parallelization=False,
n_cpu=None,
**kwargs,
):
if clustering_method not in self.available_clustering_algorithms:
logger.error(
f"Invalid clustering algorithm. Available algorithms are:"
f"{self.available_clustering_algorithms.keys()}"
)
raise UnavailableClusteringAlgorithm
cluster_method = self.available_clustering_algorithms[clustering_method][
"value"
]()
dist_matrix = self.get_distance_matrix(
syngraphs, ged_method, ged_params, parallelization, n_cpu
)
return cluster_method.get_clustering(dist_matrix, save_dist_matrix, **kwargs)
@staticmethod
def get_distance_matrix(
syngraphs: list,
ged_method: str,
ged_params: dict,
parallelization: bool,
n_cpu: int,
):
"""To get the ged matrix for the input graphs"""
return compute_distance_matrix(
syngraphs, ged_method, ged_params, parallelization, n_cpu
)
[docs]
def clusterer(
syngraphs: List[Union[MonopartiteReacSynGraph, BipartiteSynGraph]],
ged_method: str,
clustering_method: str,
ged_params: Union[dict, None] = None,
save_dist_matrix: bool = False,
parallelization: bool = False,
n_cpu: Union[int, None] = None,
**kwargs,
) -> tuple:
"""
To cluster a list of SynGraph objects based on their graph edit distance
Parameters:
------------
syngraphs: List[Union[MonopartiteReacSynGraph, BipartiteSynGraph]]
The routes to be clustered
ged_method: str
The algorithm to be used for GED calculations
clustering_method: str
The clustering algorithm to be used
save_dist_matrix: Optional[bool]
Whether the distance matrix should be saved and returned as output (default False)
ged_params: Union[dict, None]
It contains the optional parameters for ged calculations; if it is not provided, the default parameters are used
(default None)
parallelization: Optional[bool]
Whether parallelization should be used for computing distance matrix (default False)
n_cpu: Union[int, None]
If parallelization is activated, it indicates the number of CPUs to be used (default 'mp.cpu_count()')
**kwargs:
The optional parameters specific of the selected clustering algorithm
Returns:
---------
clustering, score, (dist_matrix): tuple
The clustering algorithm output, the silhouette score and the distance matrix (save_dist_matrix=True)
Raises:
--------
SingleRouteClustering: if the input list contains less than 2 routes
UnavailableClusteringAlgorithm: if the selected clustering algorithm is not available
Example:
---------
>>> graph = json.loads(open('az_file.json').read())
>>> syngraphs = [translator('az_retro', g, 'syngraph', out_data_model='monopartite_reactions') for g in graph]
>>> cluster1, score1 = clusterer(syngraphs,
>>> ged_method='nx_optimized_ged',
>>> clustering_method='agglomerative_cluster')
""" # noqa: E501
if len(syngraphs) < 2:
logger.error("Less than 2 routes were found: clustering not possible")
raise SingleRouteClustering
clustering_calculator = ClusterFactory()
return clustering_calculator.select_clustering_algorithms(
syngraphs,
ged_method,
clustering_method,
ged_params,
save_dist_matrix,
parallelization,
n_cpu,
**kwargs,
)
def compute_silhouette_score(dist_matrix: np.array, clusterer_labels) -> float:
"""To compute the silhouette score for the clustering of a distance matrix.
Parameters:
------------
dist_matrix: np.array
The distance matrix
clusterer_labels: the labels assigned by a clustering algorithm
Returns:
---------
score: float
The silhouette score
"""
return silhouette_score(dist_matrix, clusterer_labels, metric="precomputed")
def optimize_agglomerative_cluster(dist_matrix: np.array, linkage: str) -> tuple:
"""
To optimize the number of clusters for the AgglomerativeClustering method.
Parameters:
------------
dist_matrix: np.array
The distance matrix
linkage: str
The type of linkage to be used in the clustering
Returns:
---------
tuple:
best_clustering: the output of the clustering algorithm with the best silhouette score
max_score: a float indicating the silhouette score relative to the best_clustering
best_n_cluster: an integer indicating the number of clusters used to get the best silhouette score
""" # noqa: E501
best_clustering = None
max_score = -1.0
best_n_cluster = None
for n_cluster in range(2, min(5, len(dist_matrix))):
clustering = AgglomerativeClustering(
n_clusters=n_cluster, metric="precomputed", linkage=linkage
).fit(dist_matrix.to_numpy(dtype=float))
score = compute_silhouette_score(dist_matrix, clustering.labels_)
if score > max_score:
max_score = score
best_clustering = clustering
best_n_cluster = n_cluster
return best_clustering, max_score, best_n_cluster
def get_clustered_routes_metrics(
syngraphs: List[Union[MonopartiteReacSynGraph, BipartiteSynGraph]],
clustering_output,
) -> pd.DataFrame:
"""
To compute the metrics of the routes in the input list grouped by cluster.
Parameters:
-----------
syngraphs: List[Union[MonopartiteReacSynGraph, BipartiteSynGraph]]
The list of SynGraph for which the metrics should be computed
clustering_output:
the output of a clustering algorithm
Returns:
---------
df1: pd.DataFrame
The dataframe of computed metrics
"""
unique_labels = set(clustering_output.labels_)
col = ["routes_id", "cluster", "n_steps", "n_branch"]
df1 = pd.DataFrame(columns=col)
for k in unique_labels:
cluster_routes = np.where(clustering_output.labels_ == k)[0].tolist()
graphs = [syngraphs[i] for i in cluster_routes]
d = populate_metric_df(graphs, col, k)
df1 = pd.concat([df1, d], ignore_index=True)
return df1
def populate_metric_df(
graphs: List[Union[MonopartiteReacSynGraph, BipartiteSynGraph]],
columns: list,
k: int,
) -> pd.DataFrame:
"""To populate a dataframe with the metrics for graphs in the k-th cluster"""
d = pd.DataFrame(columns=columns)
for n, graph in enumerate(graphs):
n_step = descriptor_calculator(graph, "nr_steps")
route_id = graph.name
d.loc[n, "routes_id"] = route_id
d.loc[n, "n_steps"] = n_step
b = descriptor_calculator(graph, "nr_branches")
d.loc[n, "n_branch"] = b
d.loc[n, "cluster"] = k
return d
def get_available_clustering() -> dict:
"""Returns a dictionary with the available clustering algorithms and some info"""
return {
f: additional_info["info"]
for f, additional_info in ClusterFactory.available_clustering_algorithms.items()
}