from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import List, Union
import pandas as pd
import linchemin.IO.io as lio
from linchemin import settings
from linchemin.cgu.convert import converter
from linchemin.cgu.syngraph import (
BipartiteSynGraph,
MonopartiteMolSynGraph,
MonopartiteReacSynGraph,
)
from linchemin.cgu.syngraph_operations import merge_syngraph
from linchemin.cgu.translate import get_available_data_models
from linchemin.cheminfo.atom_mapping import get_available_mappers
from linchemin.interfaces.facade import facade
from linchemin.interfaces.utils_interfaces import get_ged_dict, get_parallelization_dict
from linchemin.interfaces.writers import SyngraphWriterFactory, write_syngraph
from linchemin.rem.clustering import get_available_clustering
from linchemin.rem.route_descriptors import get_available_descriptors
from linchemin.utilities import console_logger
"""
Module containing out-of-the-box "workflow", consisting of a sequence of facade functions.
The functionalities to be actually activated are selected by the user by setting the arguments of the function
'process_routes'.
"""
logger = console_logger(__name__)
class WorkflowError(Exception):
"""Base class for errors raised during workflow running"""
class NoValidRoute(WorkflowError):
"""Raised if no valid routes are found"""
class InvalidCasp(WorkflowError):
"""Raised if an unavailable CASP format is selected"""
[docs]
@dataclass
class WorkflowOutput:
"""
Class to store the outcome of the 'process_routes' function.
Attributes:
------------
routes_list: a list of routes as instances of a SynGraph subclass
descriptors: a pandas DataFrame containing the routes ids and the computed descriptors (returned only if
the 'compute_descriptors' functionality is activated)
clustering: a tuple with the output of the cluster algorithm and the Silhouette score (returned only if
the 'clustering' or the 'clustering_and_d_matrix' functionalities are activated)
clustered_descriptors: a pandas DataFrame containing the routes ids, the cluster labels and few descriptors
(returned only if the 'clustering' or the 'clustering_and_d_matrix' functionalities
are activated)
distance_matrix: a pandas DataFrame containing the GED distance matrix (returned only if the
'distance_matrix' or the 'clustering_and_d_matrix' functionalities are activated)
tree: an instances of a SynGraph subclass obtained from the merging of the input routes (returned only if
the 'merging' functionality is activated)
reaction_strings: a list of reaction strings (returned only if the 'extracting_reactions' functionality
is activated)
log: a dictionary containing the exceptions occurred during the process
"""
routes_list: list = field(default_factory=list)
descriptors: pd.DataFrame = field(default_factory=pd.DataFrame)
clustering: tuple = field(default_factory=tuple)
clustered_descriptors: pd.DataFrame = field(default_factory=pd.DataFrame)
distance_matrix: pd.DataFrame = field(default_factory=pd.DataFrame)
tree: Union[
BipartiteSynGraph, MonopartiteReacSynGraph, MonopartiteMolSynGraph, None
] = None
reaction_strings: list = field(default_factory=list)
log: dict = field(default_factory=dict)
# Functionalities factory
class WorkflowStep(ABC):
"""Abstract class for the WorkflowStep taking care of the single functionalities"""
info: str
@abstractmethod
def perform_step(self, data: dict, output: WorkflowOutput) -> WorkflowOutput:
pass
class TranslationStep(WorkflowStep):
"""Handler handling the translation functionality of facade
It translates the input list of graphs in syngraph objects
"""
info = "To translate routes"
casps = {
"ibmrxn": "ibm_retro",
"az": "az_retro",
"askcosv1": "askcosv1",
"askcosv2": "askcosv2",
"reaxys": "reaxys",
}
def perform_step(self, params: dict, output: WorkflowOutput):
print("Translating the routes in the input file to a list of SynGraph....")
all_routes = []
for file, casp in params["input"].items():
routes = lio.read_json(file)
if casp not in self.casps:
logger.error(
f"{casp} is not a valid casp name. Available casps are: {list(self.casps.keys())}"
)
raise InvalidCasp
syngraph_routes, meta = facade(
functionality="translate",
input_format=self.casps[casp],
routes=routes,
out_data_model=params["out_data_model"],
parallelization=params["parallelization"],
n_cpu=params["n_cpu"],
)
all_routes += syngraph_routes
output.log["_".join(["translation", file])] = meta
if len(all_routes) < 1:
logger.error("No valid routes were found. Workflow interrupted.")
raise NoValidRoute
output.routes_list = all_routes
return output
class DescriptorsStep(WorkflowStep):
"""Handler handling the descriptors functionality of facade
If 'compute_descriptors' is in 'functionalities', the selected descriptors are computed and written to the file
'descriptors.csv'
"""
info = "To compute routes descriptors"
def perform_step(self, params: dict, output: WorkflowOutput):
print("Computing the routes descriptors...")
descriptors, meta = facade(
functionality="routes_descriptors",
routes=output.routes_list,
descriptors=params["descriptors"],
)
output.descriptors = descriptors
output.log["compute_descriptors"] = meta
lio.dataframe_to_csv(descriptors, "descriptors.csv")
return output
class ClusteringAndDistanceMatrixStep(WorkflowStep):
"""Handler handling the clustering functionality of facade
If the 'clustering' argument is in 'functionalities',
the routes are clustered and a file 'cluster_metrics.csv'
with the cluster label, the number of steps and the
number of branches for each route is written.
"""
info = "To compute the distance matrix and clustering the routes"
def perform_step(self, params: dict, output: WorkflowOutput):
print("Clustering the routes...")
cluster_output, meta = facade(
functionality="clustering",
routes=output.routes_list,
clustering_method=params["clustering_method"],
ged_method=params["ged_method"],
ged_params=params["ged_params"],
save_dist_matrix=True,
compute_metrics=True,
parallelization=params["parallelization"],
n_cpu=params["n_cpu"],
)
if len(cluster_output) == 2:
clustering, metrics = cluster_output
else:
clustering = cluster_output
metrics = None
if cluster_output is None and metrics is None:
output.log["clustering_and_d_matrix"] = meta
else:
output.clustering = clustering[0]
output.clustered_descriptors = metrics
output.distance_matrix = clustering[2]
output.log["clustering_and_d_matrix"] = meta
lio.dataframe_to_csv(clustering[2], "distance_matrix.csv")
lio.dataframe_to_csv(metrics, "cluster_metrics.csv")
return output
class ClusteringStep(WorkflowStep):
"""Handler handling the clustering functionality of facade
If the 'clustering_dist_matrox' argument is in 'functionalities', the routes are clustered and a file
'cluster_metrics.csv' with the cluster label, the number of steps and the number of branches for each route is
written and the distance matrix is written in the 'distance_matrix.csv' file.
"""
info = "To clustering the routes"
def perform_step(self, params: dict, output: WorkflowOutput):
cluster_output, meta = facade(
functionality="clustering",
routes=output.routes_list,
clustering_method=params["clustering_method"],
ged_method=params["ged_method"],
ged_params=params["ged_params"],
compute_metrics=True,
parallelization=params["parallelization"],
n_cpu=params["n_cpu"],
)
if len(cluster_output) == 2:
clustering, metrics = cluster_output
else:
clustering = cluster_output
metrics = None
if clustering is None:
output.log["clustering_and_d_matrix"] = meta
else:
output.clustering = clustering[0]
output.clustered_descriptors = metrics
output.log["clustering"] = meta
lio.dataframe_to_csv(metrics, "cluster_metrics.csv")
return output
class DistanceMatrixStep(WorkflowStep):
"""Handler handling the distance matrix functionality of facade.
If the 'distance_matrix' argument is in 'functionalities', the distance matrix is computed and written in the
'distance_matrix.csv' file.
"""
info = "To compute the distance matrix"
def perform_step(self, params: dict, output: WorkflowOutput):
print("Computing the distance matrix...")
d_matrix, meta = facade(
functionality="distance_matrix",
routes=output.routes_list,
ged_method=params["ged_method"],
ged_params=params["ged_params"],
parallelization=params["parallelization"],
n_cpu=params["n_cpu"],
)
if not d_matrix:
output.log["distance_matrix"] = meta
else:
output.distance_matrix = d_matrix
output.log["distance_matrix"] = meta
lio.dataframe_to_csv(d_matrix, "distance_matrix.csv")
return output
class MergingStep(WorkflowStep):
"""Handler handling the merging functionality of facade.
If the 'merging' argument is in 'functionalities', the routes are merged and the obtained tree is written
in the 'tree' file. The file extension is determined by the 'output_format' argument.
"""
info = 'To merge the routes in a "SynTree"'
def perform_step(self, params: dict, output: WorkflowOutput):
print("Merging routes...")
merged = merge_syngraph(list_syngraph=output.routes_list)
merged_converted = converter(merged, params["out_data_model"])
output.tree = merged_converted
write_syngraph(
[merged_converted],
params["out_data_model"],
params["output_format"],
"tree",
)
return output
class ExtractingReactionsStep(WorkflowStep):
"""Handler handling the extract_reactions_strings functionality of facade.
If the 'extracting_reactions' argument is in 'functionalities', a list of reaction strings is extracted for
each route.
"""
info = "To extract reactions strings from a list of SynGraph objects"
def perform_step(self, params: dict, output: WorkflowOutput):
print("Extracting reaction strings...")
reactions, m = facade(
functionality="extract_reactions_strings", routes=output.routes_list
)
output.reaction_strings = reactions
lio.write_json(reactions, "reaction_strings.json")
return output
class AtomMappingStep(WorkflowStep):
"""Handler handling the atom_mapping functionality of facade.
If the 'atom_mapping' argument is in 'functionalities', reaction strings are extracted from the routes and
sent to the selected atom-to-atom mapping tools. The mapped reaction strings are then used to rebuild the routes
"""
info = "To map the reaction strings involved in the routes"
def perform_step(self, params: dict, output: WorkflowOutput):
print("Mapping the reactions")
mapped_routes, m = facade(
functionality="atom_mapping",
routes=output.routes_list,
mapper=params["mapper"],
)
mapped_routes_converted = [
converter(r, params["out_data_model"]) for r in mapped_routes
]
output.routes_list = mapped_routes_converted
return output
# Workflow Chain
class WorkflowHandler(ABC):
"""Abstract handler for the concrete handlers taking care of the single steps in the workflow"""
@abstractmethod
def execute(self, params: dict, output: WorkflowOutput):
pass
class WorkflowStarter(WorkflowHandler):
"""Concrete handler to perform the first step in the workflow"""
def execute(self, params: dict, output: WorkflowOutput):
"""Executes the first step of the workflow: translation of the routes in SynGraph objects and, if selected,
the reactions' atom mapping
"""
try:
output = self._get_translation(params, output)
if params["mapping"] is True:
output = self._get_mapped_routes(params, output)
return Executor().execute(params, output)
except NoValidRoute:
return None
@staticmethod
def _get_translation(params: dict, output: WorkflowOutput) -> WorkflowOutput:
"""Calls the translation step of the workflow"""
output = TranslationStep().perform_step(params, output)
return output
@staticmethod
def _get_mapped_routes(params: dict, output: WorkflowOutput) -> WorkflowOutput:
"""Calls the atom mapping step of the workflow"""
output = AtomMappingStep().perform_step(params, output)
return output
class Executor(WorkflowHandler):
"""Concrete handler performing the desired optional functionalities"""
steps_map = {
"compute_descriptors": {"value": DescriptorsStep, "info": DescriptorsStep.info},
"clustering_and_d_matrix": {
"value": ClusteringAndDistanceMatrixStep,
"info": ClusteringAndDistanceMatrixStep.info,
},
"clustering": {"value": ClusteringStep, "info": ClusteringStep.info},
"distance_matrix": {
"value": DistanceMatrixStep,
"info": DistanceMatrixStep.info,
},
"merging": {"value": MergingStep, "info": MergingStep.info},
"extracting_reactions": {
"value": ExtractingReactionsStep,
"info": ExtractingReactionsStep.info,
},
}
def execute(self, params: dict, output: WorkflowOutput):
"""Executes the selected functionalities"""
if params["functionalities"] is not None:
requests = params["functionalities"]
for request in requests:
if request not in self.steps_map:
logger.warning(
f'"{request}" is not a valid functionality: it will be ignored.'
"Available functionalities are: "
f"{list(self.steps_map.keys())}"
)
continue
output = self.steps_map[request]["value"]().perform_step(params, output)
return Finisher().execute(params=params, output=output)
def get_workflow_functions(self):
return {
f: additional_info["info"] for f, additional_info in self.steps_map.items()
}
class Finisher(WorkflowHandler):
"""Concrete handler to perform the last step in the workflow"""
def execute(self, params: dict, output: WorkflowOutput):
print("Writing the routes in the output file...")
write_syngraph(
syngraphs=output.routes_list,
out_data_model=params["out_data_model"],
output_format=params["output_format"],
file_name="routes",
)
return output
class WorkflowBuilder:
"""Class to start the chain calling the handler of the first step"""
@staticmethod
def initiate_workflow(params):
output = WorkflowOutput()
return WorkflowStarter().execute(params, output)
[docs]
def process_routes(
input_dict: dict,
output_format: str = settings.WORKFLOW.output_format,
mapping: bool = settings.WORKFLOW.mapping,
functionalities: Union[List[str], None] = settings.WORKFLOW.functionalities,
mapper: Union[str, None] = settings.FACADE.mapper,
out_data_model: str = settings.FACADE.out_data_model,
descriptors: List[str] = settings.FACADE.descriptors,
ged_method: str = settings.FACADE.ged_method,
ged_params: Union[dict, None] = settings.FACADE.ged_params,
clustering_method: Union[str, None] = settings.FACADE.clustering_method,
parallelization: bool = settings.FACADE.parallelization,
n_cpu: int = settings.FACADE.n_cpu,
) -> WorkflowOutput:
"""
Function process routed predicted by CASP tools: based on the input arguments, only the selected
functionalities are performed. The mandatory start and stop actions are (i) to read a json file containing the
routes predicted by a CASP tool, and (ii) to write the routes in an output file.
Possible additional actions are:
- performing the atom mapping of the reactions involved in the routes
- computing route descriptors
- computing the distance matrix between the routes
- clustering the routes
- merging the routes
- extracting the reaction strings from the routes
Parameters:
------------
input_dict: dict
The path to the input files and the relative casp names in the form {'file_path': 'casp_name'}
output_format: Optional[str]
The type of file to which the routes should be written (default 'json')
mapping: Optional[bool]
Whether the reactions involved in the routes should go through the atom-to-atom mapping (default False)
functionalities: Optional[Union[List[str], None]]
The list of the functionalities to be performed; if it is None, the input routes are read
and written to a file (default None)
mapper: Optional[str]
The name of the mapping tool to be used; if it is None, the mapping pipeline is used (default None)
out_data_model: Optional[str]
The data model for the output routes (default 'bipartite')
descriptors: Optional[Union[List[str], None]]
The list of the descriptos to be computed; if it is None, all the available are calculated (default None)
ged_method: Optional[str]
The method to be used for graph similarity calculations (default 'nx_ged')
ged_params: Optional[Union[dict, None]]
The dictionary with the parameters for specifying reaction and molecular fingerprints and similarity functions;
if it is None, the default values are used (default None)
clustering_method: Optional[Union[str, None]]
The clustering algorithm to be used for clustering the routes; if it is None, hdbscan
is used when there are more than 15 routes, Agglomerative Clustering otherwise (default None)
parallelization: Optional[bool]
Whether parallel computing should be used where possible (default False)
n_cpu: Optional[int]
The number of cpus to be used if parallelization is used (default 8)
Returns:
---------
output: WorkflowOutput
Its attributes store the results of the selected functionalities. The outcomes are also written to files.
Raises:
--------
NoValidRoute: if the input file(s) does not contain any valid route
KeyError: if a selected option is not available
Example:
---------
>>> output = process_routes({'ibmrxn_file.json': 'ibmrxn', # path to json file from ibmrxn
>>> 'az_file.json': 'az'}, # path to json file from az casp
>>> functionalities=[ # the functionalities to be activated
>>> 'compute_descriptors', # calculation of routes descriptors
>>> 'clustering_and_d_matrix', # calculation of distance matrix and clustering
>>> 'merging']) # merging of the routes to obtain a "tree"
"""
params = {
"input": input_dict,
"output_format": output_format,
"mapping": mapping,
"functionalities": functionalities,
"mapper": mapper,
"out_data_model": out_data_model,
"descriptors": descriptors,
"ged_method": ged_method,
"ged_params": ged_params,
"clustering_method": clustering_method,
"parallelization": parallelization,
"n_cpu": n_cpu,
}
output = WorkflowBuilder().initiate_workflow(params)
print("All done!")
return output
# Supporting functions and classes
# Workflow helper
[docs]
def get_workflow_options(verbose=False):
"""
Returns the available options for the 'process_routes' function.
Parameters:
-----------
verbose: Optional[bool]]
It indicates whether the information should be printed on the screen (default False)
Returns:
--------
available options: dict
The dictionary listing arguments, options and default values of the 'process_routes' function
Example:
--------
>>> options = get_workflow_options(verbose=True)
"""
if verbose:
return print_options()
else:
d = {
"input_dict": {
"name_or_flags": ["-input_dict"],
"default": None,
"required": True,
"type": dict,
"choices": None,
"help": 'Path to the input files and relative casp names in the form "file_path"="casp_name". Available CASP names are {}'.format(
list(TranslationStep.casps.keys())
),
"dest": "input_dict",
},
"output_format": {
"name_or_flags": ["-output_format"],
"default": settings.WORKFLOW.output_format,
"required": False,
"type": str,
"choices": SyngraphWriterFactory.list_writers(),
"help": "Format of the output file containing the routes",
"dest": "output_format",
},
"mapping": {
"name_or_flags": ["-mapping"],
"default": settings.WORKFLOW.mapping,
"required": False,
"type": bool,
"choices": [True, False],
"help": "Whether the atom-to-atom mapping of the reactions should be performed",
"dest": "mapping",
},
"mapper": {
"name_or_flags": ["-mapper"],
"default": settings.FACADE.mapper,
"required": False,
"type": str,
"choices": get_available_mappers(),
"help": "Which mapper should be used; if None, the mapping pipeline is used",
"dest": "mapper",
},
"functionalities": {
"name_or_flags": ["-functionalities"],
"default": settings.WORKFLOW.functionalities,
"required": False,
"type": list,
"choices": Executor().get_workflow_functions(),
"help": "List of functionalities to be performed",
"dest": "functionalities",
},
"out_data_model": {
"name_or_flags": ["-out_data_model"],
"default": settings.FACADE.out_data_model,
"required": False,
"type": str,
"choices": get_available_data_models(),
"help": "Data model of the output graphs",
"dest": "out_data_model",
},
"descriptors": {
"name_or_flags": ["-descriptors"],
"default": settings.FACADE.descriptors,
"required": False,
"type": list,
"choices": get_available_descriptors(),
"help": "List of descriptors to be calculated; if None, all descriptors are calculated",
"dest": "descriptors",
},
"clustering_method": {
"name_or_flags": ["-clustering_method"],
"default": settings.FACADE.clustering_method,
"required": False,
"type": str,
"choices": get_available_clustering(),
"help": "Method to be used to calculate the GED",
"dest": "ged_method",
},
}
d.update(get_ged_dict())
d.update(get_parallelization_dict())
return d
def print_options():
"""
Prints the available options for the workflow function.
"""
print("Workflow options and default:")
data = get_workflow_options()
for d, info in data.items():
print("argument:", d)
print(" info: ", info["help"])
print(" default: ", info["default"])
print(" available options: ", info["choices"])
return data