import importlib
import os
from os.path import dirname, abspath
import sys
from multiprocessing import Pool
from typing import List, Tuple, Optional, Dict, Set, Any, Union
import yaml
from kgx.validator import Validator
from kgx.sink import Sink
from kgx.transformer import Transformer, SOURCE_MAP, SINK_MAP
from kgx.config import get_logger
from kgx.graph.base_graph import BaseGraph
from kgx.graph_operations.graph_merge import merge_all_graphs
from kgx.graph_operations import summarize_graph, meta_knowledge_graph
from kgx.utils.kgx_utils import apply_graph_operations, knowledge_provenance_properties
summary_report_types = {
"kgx-map": summarize_graph.GraphSummary,
"meta-knowledge-graph": meta_knowledge_graph.MetaKnowledgeGraph,
}
log = get_logger()
[docs]def get_output_file_types() -> Tuple:
"""
Get all output file formats supported by KGX.
Returns
-------
Tuple
A tuple of supported file formats
"""
return tuple(SINK_MAP.keys())
[docs]def graph_summary(
inputs: List[str],
input_format: str,
input_compression: Optional[str],
output: Optional[str],
report_type: str,
report_format: Optional[str] = None,
stream: bool = False,
graph_name: Optional[str] = None,
node_facet_properties: Optional[List] = None,
edge_facet_properties: Optional[List] = None,
error_log: str = "",
) -> Dict:
"""
Loads and summarizes a knowledge graph from a set of input files.
Parameters
----------
inputs: List[str]
Input file
input_format: str
Input file format
input_compression: Optional[str]
The input compression type
output: Optional[str]
Where to write the output (stdout, by default)
report_type: str
The summary report type
report_format: Optional[str]
The summary report format file types: 'yaml' or 'json'
stream: bool
Whether to parse input as a stream
graph_name: str
User specified name of graph being summarized
node_facet_properties: Optional[List]
A list of node properties from which to generate counts per value for those properties.
For example, ``['provided_by']``
edge_facet_properties: Optional[List]
A list of edge properties (e.g. knowledge_source tags) to facet on.
For example, ``['original_knowledge_source', 'aggregator_knowledge_source']``
error_log: str
Where to write any graph processing error message (stderr, by default)
Returns
-------
Dict
A dictionary with the graph stats
"""
if not graph_name:
graph_name = "Graph"
if report_format and report_format not in get_report_format_types():
raise ValueError(f"report_format must be one of {get_report_format_types()}")
if report_type in summary_report_types:
# New design pattern enabling 'stream' processing of statistics on a small memory footprint
# by injecting an inspector in the Transformer.process() source-to-sink data flow.
#
# First, we instantiate the Inspector (generally, a Callable class)...
#
inspector = summary_report_types[report_type](
# ...thus, there is no need to hand the Inspector the graph;
# rather, the inspector will see the graph data after
# being injected into the Transformer.transform() workflow
# graph=transformer.store.graph,
name=graph_name,
node_facet_properties=node_facet_properties,
edge_facet_properties=edge_facet_properties,
error_log=error_log,
)
else:
raise ValueError(f"report_type must be one of {summary_report_types.keys()}")
if stream:
output_args = {
"format": "null"
} # streaming processing throws the graph data away
else:
output_args = None
transformer = Transformer(stream=stream)
transformer.transform(
input_args={
"filename": inputs,
"format": input_format,
"compression": input_compression,
},
output_args=output_args,
# ... Second, we inject the Inspector into the transform() call,
# for the underlying Transformer.process() to use...
inspector=inspector,
)
if output:
with open(output, "w") as gsr:
inspector.save(gsr, file_format=report_format)
else:
inspector.save(sys.stdout, file_format=report_format)
# ... Third, we directly return the graph statistics to the caller.
return inspector.get_graph_summary()
[docs]def validate(
inputs: List[str],
input_format: str,
input_compression: Optional[str],
output: Optional[str],
stream: bool,
biolink_release: Optional[str] = None,
) -> List:
"""
Run KGX validator on an input file to check for Biolink Model compliance.
Parameters
----------
inputs: List[str]
Input files
input_format: str
The input format
input_compression: Optional[str]
The input compression type
output: Optional[str]
Path to output file (stdout, by default)
stream: bool
Whether to parse input as a stream.
biolink_release: Optional[str] = None
SemVer version of Biolink Model Release used for validation (default: latest Biolink Model Toolkit version)
Returns
-------
List
Returns a list of errors, if any
"""
# New design pattern enabling 'stream' processing of statistics on a small memory footprint
# by injecting an inspector in the Transformer.process() source-to-sink data flow.
#
# First, we instantiate a Validator() class (converted into a Callable class) as an Inspector ...
# In the new "Inspector" design pattern, we need to instantiate it before the Transformer.
#
Validator.set_biolink_model(biolink_release)
# Validator assumes the currently set Biolink Release
validator = Validator()
if stream:
transformer = Transformer(stream=stream)
transformer.transform(
input_args={
"filename": inputs,
"format": input_format,
"compression": input_compression,
},
output_args={
"format": "null"
}, # streaming processing throws the graph data away
# ... Second, we inject the Inspector into the transform() call,
# for the underlying Transformer.process() to use...
inspector=validator,
)
else:
# "Classical" non-streaming mode, with click.progressbar
# but an unfriendly large memory footprint for large graphs
transformer = Transformer()
transformer.transform(
{
"filename": inputs,
"format": input_format,
"compression": input_compression,
},
)
# Slight tweak of classical 'validate' function: that the
# list of errors are cached internally in the Validator object
validator.validate(transformer.store.graph)
if output:
validator.write_report(open(output, "w"))
else:
validator.write_report(sys.stdout)
# ... Third, we return directly any validation errors to the caller
return validator.get_errors()
[docs]def neo4j_download(
uri: str,
username: str,
password: str,
output: str,
output_format: str,
output_compression: Optional[str],
stream: bool,
node_filters: Optional[Tuple] = None,
edge_filters: Optional[Tuple] = None,
) -> Transformer:
"""
Download nodes and edges from Neo4j database.
Parameters
----------
uri: str
Neo4j URI. For example, https://localhost:7474
username: str
Username for authentication
password: str
Password for authentication
output: str
Where to write the output (stdout, by default)
output_format: Optional[str]
The output type (``tsv``, by default)
output_compression: Optional[str]
The output compression type
stream: bool
Whether to parse input as a stream
node_filters: Optional[Tuple]
Node filters
edge_filters: Optional[Tuple]
Edge filters
Returns
-------
kgx.Transformer
The NeoTransformer
"""
transformer = Transformer(stream=stream)
transformer.transform(
{
"uri": uri,
"username": username,
"password": password,
"format": "neo4j",
"node_filters": node_filters,
"edge_filters": edge_filters,
}
)
if not output_format:
output_format = "tsv"
transformer.save(
{"filename": output, "format": output_format, "compression": output_compression}
)
return transformer
[docs]def neo4j_upload(
inputs: List[str],
input_format: str,
input_compression: Optional[str],
uri: str,
username: str,
password: str,
stream: bool,
node_filters: Optional[Tuple] = None,
edge_filters: Optional[Tuple] = None,
) -> Transformer:
"""
Upload a set of nodes/edges to a Neo4j database.
Parameters
----------
inputs: List[str]
A list of files that contains nodes/edges
input_format: str
The input format
input_compression: Optional[str]
The input compression type
uri: str
The full HTTP address for Neo4j database
username: str
Username for authentication
password: str
Password for authentication
stream: bool
Whether to parse input as a stream
node_filters: Optional[Tuple]
Node filters
edge_filters: Optional[Tuple]
Edge filters
Returns
-------
kgx.Transformer
The NeoTransformer
"""
transformer = Transformer(stream=stream)
transformer.transform(
{
"filename": inputs,
"format": input_format,
"compression": input_compression,
"node_filters": node_filters,
"edge_filters": edge_filters,
}
)
transformer.save(
{"uri": uri, "username": username, "password": password, "format": "neo4j"}
)
return transformer
def _validate_files(cwd: str, file_paths: List[str], context: str = ""):
"""
Utility method for resolving file paths
:param cwd: current working directory for resolving possible relative file path names
:param file_list: list of file path names to resolve
:param context: optional source context of of the file list
:return: resolved list of file paths (as absolute paths)
"""
resolved_files: List[str] = list()
for f in file_paths:
# check if the file exists as an absolute path
if not os.path.exists(f):
# otherwise, check if file exists as a path
# relative to the provided "current working directory"
f = abspath(cwd + "/" + f)
if not os.path.exists(f):
raise FileNotFoundError(
f"Filename '{f}' for source '{context}' does not exist!"
)
if not os.path.isfile(f):
raise FileNotFoundError(
f"Filename '{f}' for source '{context}' is not a file!"
)
resolved_files.append(f)
return resolved_files
def _process_knowledge_source(ksf: str, spec: str) -> Union[str, bool, Tuple]:
if ksf not in knowledge_provenance_properties:
log.warning("Unknown Knowledge Source Field: " + ksf + "... ignoring!")
return False
else:
if spec.lower() == "true":
return True
elif spec.lower() == "false":
return False
else:
# If a Tuple, expect a comma-delimited string?
spec_parts = spec.split(",")
if len(spec_parts) == 1:
# assumed to be just a default string value for the knowledge source field
return spec_parts[0]
else:
# assumed to be an InfoRes Tuple rewrite specification
if len(spec_parts) > 3:
spec_parts = spec_parts[:2]
return tuple(spec_parts)
[docs]def merge(
merge_config: str,
source: Optional[List] = None,
destination: Optional[List] = None,
processes: int = 1,
) -> BaseGraph:
"""
Load nodes and edges from files and KGs, as defined in a config YAML, and merge them into a single graph.
The merged graph can then be written to a local/remote Neo4j instance OR be serialized into a file.
Parameters
----------
merge_config: str
Merge config YAML
source: Optional[List]
A list of source to load from the YAML
destination: Optional[List]
A list of destination to write to, as defined in the YAML
processes: int
Number of processes to use
Returns
-------
kgx.graph.base_graph.BaseGraph
The merged graph
"""
# Use the directory within which the 'merge_config' file
# exists as a 'current working directory' for
# resolving relative filename paths in the configuration.
cwd = dirname(merge_config)
with open(merge_config, "r") as YML:
cfg = yaml.load(YML, Loader=yaml.FullLoader)
output_directory = "output"
top_level_args = {}
if "configuration" in cfg:
top_level_args = prepare_top_level_args(cfg["configuration"])
if (
"output_directory" in cfg["configuration"]
and cfg["configuration"]["output_directory"]
):
output_directory = cfg["configuration"]["output_directory"]
if not output_directory.startswith(os.path.sep):
# relative path
output_directory = f"{os.path.abspath(os.path.dirname(merge_config))}{os.path.sep}{output_directory}"
if not os.path.exists(output_directory):
os.mkdir(output_directory)
if not source:
source = cfg["merged_graph"]["source"].keys()
if not destination:
destination = cfg["merged_graph"]["destination"].keys()
for s in source:
source_properties = cfg["merged_graph"]["source"][s]
if source_properties["input"]["format"] in get_input_file_types():
source_properties["input"]["filename"] = _validate_files(
cwd=cwd, file_paths=source_properties["input"]["filename"], context=s
)
sources_to_parse = {}
for key in cfg["merged_graph"]["source"]:
if key in source:
sources_to_parse[key] = cfg["merged_graph"]["source"][key]
results = []
pool = Pool(processes=processes)
for k, v in sources_to_parse.items():
log.info(f"Spawning process for '{k}'")
result = pool.apply_async(
parse_source,
(
k,
v,
output_directory,
top_level_args["prefix_map"],
top_level_args["node_property_predicates"],
top_level_args["predicate_mappings"],
top_level_args["checkpoint"],
),
)
results.append(result)
pool.close()
pool.join()
stores = [r.get() for r in results]
merged_graph = merge_all_graphs([x.graph for x in stores])
log.info(
f"Merged graph has {merged_graph.number_of_nodes()} nodes and {merged_graph.number_of_edges()} edges"
)
if "name" in cfg["merged_graph"]:
merged_graph.name = cfg["merged_graph"]["name"]
if "operations" in cfg["merged_graph"]:
apply_graph_operations(merged_graph, cfg["merged_graph"]["operations"])
destination_to_write: Dict[str, Dict] = {}
for d in destination:
if d in cfg["merged_graph"]["destination"]:
destination_to_write[d] = cfg["merged_graph"]["destination"][d]
else:
raise KeyError(f"Cannot find destination '{d}' in YAML")
# write the merged graph
node_properties = set()
edge_properties = set()
for s in stores:
node_properties.update(s.node_properties)
edge_properties.update(s.edge_properties)
input_args = {"graph": merged_graph, "format": "graph"}
if destination_to_write:
for key, destination_info in destination_to_write.items():
log.info(f"Writing merged graph to {key}")
output_args = {
"format": destination_info["format"],
"reverse_prefix_map": top_level_args["reverse_prefix_map"],
"reverse_predicate_mappings": top_level_args[
"reverse_predicate_mappings"
],
}
if "reverse_prefix_map" in destination_info:
output_args["reverse_prefix_map"].update(
destination_info["reverse_prefix_map"]
)
if "reverse_predicate_mappings" in destination_info:
output_args["reverse_predicate_mappings"].update(
destination_info["reverse_predicate_mappings"]
)
if destination_info["format"] == "neo4j":
output_args["uri"] = destination_info["uri"]
output_args["username"] = destination_info["username"]
output_args["password"] = destination_info["password"]
elif destination_info["format"] in get_input_file_types():
filename = destination_info["filename"]
if isinstance(filename, list):
filename = filename[0]
destination_filename = f"{output_directory}/{filename}"
output_args["filename"] = destination_filename
output_args["compression"] = (
destination_info["compression"]
if "compression" in destination_info
else None
)
if destination_info['format'] == 'nt':
output_args['property_types'] = top_level_args['property_types']
if 'property_types' in top_level_args and 'property_types' in destination_info.keys():
output_args['property_types'].update(destination_info['property_types'])
if destination_info['format'] in {'csv', 'tsv'}:
output_args['node_properties'] = node_properties
output_args['edge_properties'] = edge_properties
else:
raise TypeError(
f"type {destination_info['format']} not yet supported for KGX merge operation."
)
transformer = Transformer()
transformer.transform(input_args, output_args)
else:
log.warning(
f"No destination provided in {merge_config}. The merged graph will not be persisted."
)
return merged_graph
[docs]def parse_source(
key: str,
source: dict,
output_directory: str,
prefix_map: Dict[str, str] = None,
node_property_predicates: Set[str] = None,
predicate_mappings: Dict[str, str] = None,
checkpoint: bool = False,
) -> Sink:
"""
Parse a source from a merge config YAML.
Parameters
----------
key: str
Source key
source: Dict
Source configuration
output_directory: str
Location to write output to
prefix_map: Dict[str, str]
Non-canonical CURIE mappings
node_property_predicates: Set[str]
A set of predicates that ought to be treated as node properties (This is applicable for RDF)
predicate_mappings: Dict[str, str]
A mapping of predicate IRIs to property names (This is applicable for RDF)
checkpoint: bool
Whether to serialize each individual source to a TSV
Returns
-------
kgx.sink.sink.Sink
Returns an instance of Sink
"""
log.info(f"Processing source '{key}'")
if not key:
key = os.path.basename(source["input"]["filename"][0])
input_args = prepare_input_args(
key,
source,
output_directory,
prefix_map,
node_property_predicates,
predicate_mappings,
)
transformer = Transformer()
transformer.transform(input_args)
transformer.store.graph.name = key
if checkpoint:
log.info(f"Writing checkpoint for source '{key}'")
checkpoint_output = f"{output_directory}/{key}" if output_directory else key
transformer.save({"filename": checkpoint_output, "format": "tsv"})
# Current "Callable" metadata not needed at this point
# but causes peculiar problems downstream, so we clear it.
transformer.store.clear_graph_metadata()
return transformer.store
[docs]def prepare_output_args(
key: str,
source: Dict,
output_directory: Optional[str],
reverse_prefix_map: Dict = None,
reverse_predicate_mappings: Dict = None,
property_types: Dict = None,
) -> Dict:
"""
Prepare output arguments for Transformer.
Parameters
----------
key: str
Source key
source: Dict
Source configuration
output_directory: str
Location to write output to
reverse_prefix_map: Dict[str, str]
Non-canonical CURIE mappings for export
reverse_predicate_mappings: Dict[str, str]
A mapping of property names to predicate IRIs (This is applicable for RDF)
property_types: Dict[str, str]
The xml property type for properties that are other than ``xsd:string``.
Relevant for RDF export.
Returns
-------
Dict
Output arguments as dictionary
"""
output_format = source["output"]["format"]
output_compression = (
source["output"]["compression"] if "compression" in source["output"] else None
)
output_filename = (
source["output"]["filename"] if "filename" in source["output"] else key
)
source_reverse_prefix_map = reverse_prefix_map.copy() if reverse_prefix_map else {}
source_reverse_prefix_map.update(
source["reverse_prefix_map"]
if "reverse_prefix_map" in source and source["reverse_prefix_map"]
else {}
)
source_reverse_predicate_mappings = (
reverse_predicate_mappings.copy() if reverse_predicate_mappings else {}
)
source_reverse_predicate_mappings.update(
source["reverse_predicate_mappings"]
if "reverse_predicate_mappings" in source
and source["reverse_predicate_mappings"] is not None
else {}
)
source_property_types = property_types.copy() if property_types else {}
source_property_types.update(
source["property_types"]
) if "property_types" in source and source["property_types"] is not None else {}
if isinstance(output_filename, list):
output = output_filename[0]
else:
output = output_filename
if output_directory and not output.startswith(output_directory):
output = os.path.join(output_directory, output)
output_args = {"format": output_format}
if output_format == "neo4j":
output_args["uri"] = source["output"]["uri"]
output_args["username"] = source["output"]["username"]
output_args["password"] = source["output"]["password"]
elif output_format in get_input_file_types():
output_args["filename"] = output
output_args["compression"] = output_compression
if output_format == "nt":
output_args["reify_all_edges"] = (
source["output"]["reify_all_edges"]
if "reify_all_edges" in source["output"]
else False
)
output_args["reverse_prefix_map"] = source_reverse_prefix_map
output_args[
"reverse_predicate_mappings"
] = source_reverse_predicate_mappings
output_args["property_types"] = source_property_types
else:
raise ValueError(f"type {output_format} not yet supported for output")
return output_args
[docs]def apply_operations(source: dict, graph: BaseGraph) -> BaseGraph:
"""
Apply operations as defined in the YAML.
Parameters
----------
source: dict
The source from the YAML
graph: kgx.graph.base_graph.BaseGraph
The graph corresponding to the source
Returns
-------
kgx.graph.base_graph.BaseGraph
The graph corresponding to the source
"""
operations = source["operations"]
for operation in operations:
op_name = operation["name"]
op_args = operation["args"]
module_name = ".".join(op_name.split(".")[0:-1])
function_name = op_name.split(".")[-1]
f = getattr(importlib.import_module(module_name), function_name)
log.info(f"Applying operation {op_name} with args: {op_args}")
f(graph, **op_args)
return graph
[docs]def prepare_top_level_args(d: Dict) -> Dict:
"""
Parse top-level configuration.
Parameters
----------
d: Dict
The configuration section from the transform/merge YAML
Returns
-------
Dict
A parsed dictionary with parameters from configuration
"""
args = {}
if "checkpoint" in d and d["checkpoint"] is not None:
args["checkpoint"] = d["checkpoint"]
else:
args["checkpoint"] = False
if "node_property_predicates" in d and d["node_property_predicates"]:
args["node_property_predicates"] = set(d["node_property_predicates"])
else:
args["node_property_predicates"] = set()
if "predicate_mappings" in d and d["predicate_mappings"]:
args["predicate_mappings"] = d["predicate_mappings"]
else:
args["predicate_mappings"] = {}
if "prefix_map" in d and d["prefix_map"]:
args["prefix_map"] = d["prefix_map"]
else:
args["prefix_map"] = {}
if "reverse_prefix_map" in d and d["reverse_prefix_map"] is not None:
args["reverse_prefix_map"] = d["reverse_prefix_map"]
else:
args["reverse_prefix_map"] = {}
if (
"reverse_predicate_mappings" in d
and d["reverse_predicate_mappings"] is not None
):
args["reverse_predicate_mappings"] = d["reverse_predicate_mappings"]
else:
args["reverse_predicate_mappings"] = {}
if "property_types" in d and d["property_types"]:
args["property_types"] = d["property_types"]
else:
args["property_types"] = {}
return args