"""Runs the alignment and connection process, input and output validation and produces reports."""
from datetime import datetime
from typing import List
from pandas import DataFrame
from onto_merger.alignment import hierarchy_utils, merge_utils
from onto_merger.alignment.alignment_manager import AlignmentManager
from onto_merger.alignment.hierarchy_utils import HierarchyManager
from onto_merger.alignment_config.validator import validate_alignment_configuration
from onto_merger.analyser import analysis_utils, pandas_profiler
from onto_merger.analyser.report_analyser import ReportAnalyser
from onto_merger.data.constants import (
DIRECTORY_DOMAIN_ONTOLOGY,
DIRECTORY_INPUT,
DIRECTORY_INTERMEDIATE,
)
from onto_merger.data.data_manager import DataManager
from onto_merger.data.dataclasses import (
DataRepository,
NamedTable,
RuntimeData,
convert_runtime_steps_to_named_table,
format_datetime,
)
from onto_merger.data_testing.ge_runner import GERunner
from onto_merger.logger.log import setup_logger
from onto_merger.report import report_generator
[docs]class Pipeline:
"""Data repository containing all input and processed DataFrames."""
"""The data repository that stores the input and output tables with their
corresponding names (types)."""
_data_repo: DataRepository = DataRepository()
def __init__(self, project_folder_path: str) -> None:
"""Initialise the Pipeline class.
:param project_folder_path: The directory path where the project inputs are
stored.
"""
self._project_folder_path = DataManager.get_absolute_path(project_folder_path)
self._short_project_name = self._project_folder_path.split("/")[-1]
self._data_manager = DataManager(project_folder_path=self._project_folder_path)
self._alignment_config = self._data_manager.load_alignment_config()
self.logger = setup_logger(module_name=__name__, file_name=self._data_manager.get_log_file_path())
self._alignment_priority_order: List[str] = []
self._runtime_data: List[RuntimeData] = []
[docs] def run_alignment_and_connection_process(self) -> None:
"""Run the alignment and connectivity process, validate inputs and outputs, produce analysis.
:return:
"""
self.logger.info("Started running alignment and connection process for " + f"'{self._short_project_name}'")
# (1) VALIDATE CONFIG
self._validate_alignment_config()
# (2) LOAD AND CHECK INPUT DATA
self._process_input_data()
# (3) RUN ALIGNMENT & POST PROCESSING
self._align_nodes()
self._post_process_alignment_output()
# (4) RUN CONNECTIVITY & POST PROCESSING
self._connect_nodes()
# (5) FINALISE OUTPUTS
self._finalise_outputs()
# (6) VALIDATE & PROFILE: intermediate & output data
self._validate_and_profile_dataset(
data_origin=DIRECTORY_INTERMEDIATE,
data_runtime_name=DIRECTORY_INTERMEDIATE,
tables=self._data_repo.get_intermediate_tables()
)
self._validate_and_profile_dataset(
data_origin=DIRECTORY_DOMAIN_ONTOLOGY,
data_runtime_name="output",
tables=self._data_repo.get_domain_tables()
)
# (7) PRODUCE ANALYSIS & REPORT
self._produce_report()
self.logger.info("Finished running alignment and connection process for " + f"'{self._short_project_name}'")
def _validate_alignment_config(self) -> None:
"""Run the alignment configuration JSON schema validator.
Raises an exception if the config is invalid.
:return:
"""
self.logger.info("Started validating alignment config...")
start_date_time = datetime.now()
config_json_is_valid = validate_alignment_configuration(alignment_config=self._alignment_config.as_dict)
if config_json_is_valid is False:
raise Exception
self._record_runtime(start_date_time=start_date_time, task_name="VALIDATE CONFIG")
self.logger.info("Finished validating alignment config.")
def _process_input_data(self) -> None:
"""Load, preprocess, profile and validate the input data.
Raises an exception if the inputs are invalid (missing or fail data tests).
Results (loaded tables) are stored in the data repository.
:return:
"""
self.logger.info("Started processing input data...")
# load and preprocess input tables: add namespaces for downstream processing
self._data_repo.update(
tables=analysis_utils.add_namespace_column_to_loaded_tables(tables=self._data_manager.load_input_tables())
)
# profile input tables
results_df = self._validate_and_profile_dataset(
data_origin=DIRECTORY_INPUT,
data_runtime_name=DIRECTORY_INPUT,
tables=self._data_repo.get_input_tables()
)
errors = results_df["nb_failed_validations"].sum()
if errors > 0:
self.logger.error(f"The INPUT data validation found {errors} errors. Terminating process. "
+ "Please resolve the errors, or force skipping errors in the config (see report "
+ f"'{self._data_manager.get_ge_data_docs_index_path_for_input()}').")
if self._alignment_config.base_config.force_through_failed_validation is False:
raise Exception
else:
self.logger.info("Process will carry on due 'force_through_failed_validation' is ON")
self.logger.info("Finished processing input data.")
def _align_nodes(self) -> None:
"""Run the alignment process.
Results (merge table and alignment steps) are stored in the data repository.
:return:
"""
self.logger.info("Started aligning nodes...")
start_date_time = datetime.now()
alignment_results, source_alignment_order = AlignmentManager(
alignment_config=self._alignment_config,
data_repo=self._data_repo,
data_manager=self._data_manager,
).align_nodes()
self._data_repo.update(tables=alignment_results.get_intermediate_tables())
self._data_manager.save_tables(tables=alignment_results.get_intermediate_tables())
self._alignment_priority_order.extend(source_alignment_order)
self._record_runtime(start_date_time=start_date_time, task_name="ALIGNMENT")
self.logger.info("Finished aligning nodes.")
def _post_process_alignment_output(self) -> None:
"""Run the merge aggregation process (merges targets become only canonical IDs).
Results (aggregated merges) are stored in the data repository.
:return:
"""
self.logger.info("Started aggregating merges...")
start_date_time = datetime.now()
tables = merge_utils.post_process_alignment_results(
data_repo=self._data_repo,
seed_id=self._alignment_config.base_config.seed_ontology_name,
alignment_priority_order=self._alignment_priority_order
)
self._data_repo.update(tables=tables)
self._data_manager.save_tables(tables=tables)
self._record_runtime(start_date_time=start_date_time, task_name="ALIGNMENT postprocessing")
self.logger.info("Finished aggregating merges.")
def _connect_nodes(self) -> None:
"""Run the connectivity process to produce the domain ontology hierarchy.
Results (hierarchy edges) are stored in the data repository.
:return:
"""
self.logger.info("Started connecting nodes...")
start_date_time = datetime.now()
self._data_repo.update(
tables=HierarchyManager(data_manager=self._data_manager).connect_nodes(
alignment_config=self._alignment_config,
source_alignment_order=self._alignment_priority_order,
data_repo=self._data_repo,
)
)
self._data_repo.update(
tables=hierarchy_utils.post_process_connectivity_results(
data_repo=self._data_repo,
)
)
self._record_runtime(start_date_time=start_date_time, task_name="CONNECTIVITY")
self.logger.info("Finished connecting nodes.")
def _finalise_outputs(self) -> None:
"""Produce the final merged ontology and pre-processes tables for validation.
Results are stored in the data repository.
:return:
"""
self.logger.info("Started finalising outputs...")
start_date_time = datetime.now()
# add NS to all outputs
self._data_repo.update(
tables=analysis_utils.add_namespace_column_to_loaded_tables(
tables=self._data_repo.get_intermediate_tables())
)
# save all outputs
self._data_manager.save_tables(tables=self._data_repo.get_intermediate_tables())
# save final tables to domain ontology folder
domain_tables = self._data_manager.produce_domain_ontology_tables(data_repo=self._data_repo)
self._data_repo.update(tables=domain_tables)
self._data_manager.save_domain_ontology_tables(tables=domain_tables)
self._record_runtime(start_date_time=start_date_time, task_name="FINALISING OUTPUTS")
self.logger.info("Finished finalising outputs.")
def _validate_and_profile_dataset(
self, data_origin: str, data_runtime_name: str, tables: List[NamedTable]
) -> DataFrame:
"""Profile and validate a dataset.
:return:
"""
self.logger.info(f"Started validating {data_runtime_name} data...")
# profile outputs
start_date_time = datetime.now()
pandas_profiler.profile_tables(tables=tables, data_manager=self._data_manager)
self._record_runtime(start_date_time=start_date_time, task_name=f"PROFILING {data_runtime_name} DATA")
# run data tests
start_date_time = datetime.now()
results_df = GERunner(
alignment_config=self._alignment_config,
ge_base_directory=self._data_manager.get_data_tests_path(),
data_manager=self._data_manager,
).run_ge_tests(named_tables=tables, data_origin=data_origin)
self._record_runtime(start_date_time=start_date_time, task_name=f"VALIDATION {data_runtime_name} DATA")
self.logger.info(f"Finished validating {data_runtime_name} data.")
return results_df
def _produce_report(self) -> None:
"""Run the alignment and connectivity evaluation process.
:return:
"""
self.logger.info("Started creating report....")
run_time_table = convert_runtime_steps_to_named_table(steps=self._runtime_data)
self._data_repo.update(table=run_time_table)
self._data_manager.save_table(table=run_time_table)
# move data docs to report folder
self._data_manager.move_data_docs_to_reports()
# run analysis & produce report
ReportAnalyser(
alignment_config=self._alignment_config,
data_repo=self._data_repo,
data_manager=self._data_manager,
runtime_data=self._runtime_data
).produce_report_data()
report_path = report_generator.produce_report(data_manager=self._data_manager)
self.logger.info(f"Finished producing HTML report (saved to '{report_path}'.")
def _record_runtime(self, start_date_time: datetime, task_name: str) -> None:
end_date_time = datetime.now()
self._runtime_data.append(
RuntimeData(
task=task_name,
start=format_datetime(start_date_time),
end=format_datetime(end_date_time),
elapsed=(end_date_time - start_date_time).total_seconds()
)
)