Source code for kgx.graph_operations.meta_knowledge_graph

from typing import Dict, List, Optional, Any, Callable, Set, Tuple, Union
from sys import stderr

import re

import yaml
from json import dump
from json.encoder import JSONEncoder

from kgx.utils.kgx_utils import GraphEntityType
from kgx.prefix_manager import PrefixManager
from kgx.graph.base_graph import BaseGraph

"""
Generate a knowledge map that corresponds to TRAPI KnowledgeMap.
Specification based on TRAPI Draft PR: https://github.com/NCATSTranslator/ReasonerAPI/pull/171
"""


####################################################################
# Next Generation Implementation of Graph Summary coding which
# leverages the new "Transformer.process()" data stream "Inspector"
# design pattern, implemented here as a "Callable" inspection class.
####################################################################
[docs]def mkg_default(o): """ JSONEncoder 'default' function override to properly serialize 'Set' objects (into 'List') """ if isinstance(o, MetaKnowledgeGraph.Category): return o.json_object() else: try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return JSONEncoder().default(o)
_category_curie_regexp = re.compile("^biolink:[A-Z][a-zA-Z]*$") _predicate_curie_regexp = re.compile("^biolink:[a-z][a-z_]*$") # DRY parse warning message def _parse_warning(prefix: str, item: str, suffix: str = None): print( prefix + " '" + item + "'" + (" " + suffix if suffix else "") + "? Ignoring...", file=MetaKnowledgeGraph.error_log, )
[docs]class MetaKnowledgeGraph: """ Class for generating a TRAPI 1.1 style of "meta knowledge graph" summary. The optional 'progress_monitor' for the validator should be a lightweight Callable which is injected into the class 'inspector' Callable, designed to intercepts node and edge records streaming through the Validator (inside a Transformer.process() call. The first (GraphEntityType) argument of the Callable tags the record as a NODE or an EDGE. The second argument given to the Callable is the current record itself. This Callable is strictly meant to be procedural and should *not* mutate the record. The intent of this Callable is to provide a hook to KGX applications wanting the namesake function of passively monitoring the graph data stream. As such, the Callable could simply tally up the number of times it is called with a NODE or an EDGE, then provide a suitable (quick!) report of that count back to the KGX application. The Callable (function/callable class) should not modify the record and should be of low complexity, so as not to introduce a large computational overhead to validation! """ error_log = stderr
[docs] def __init__( self, name="", node_facet_properties: Optional[List] = None, edge_facet_properties: Optional[List] = None, progress_monitor: Optional[Callable[[GraphEntityType, List], None]] = None, error_log=None, **kwargs, ): """ MetaKnowledgeGraph constructor. Parameters ---------- name: str (Graph) name assigned to the summary. node_facet_properties: Optional[List] A list of node properties (e.g. knowledge_source tags) to facet on. For example, ``['provided_by']`` edge_facet_properties: Optional[List] A list of edge properties (e.g. knowledge_source tags) to facet on. For example, ``['original_knowledge_source', 'aggregator_knowledge_source']`` progress_monitor: Optional[Callable[[GraphEntityType, List], None]] Function given a peek at the current record being stream processed by the class wrapped Callable. error_log: Where to write any graph processing error message (stderr, by default). """ # formal args self.name = name # these facet properties are used mainly for knowledge_source counting # using Biolink 2.0 'knowledge_source' slot values if node_facet_properties: self.node_facet_properties: Optional[List] = node_facet_properties else: # node counts still default to 'provided_by' self.node_facet_properties: Optional[List] = ["provided_by"] if edge_facet_properties: self.edge_facet_properties: Optional[List] = edge_facet_properties else: # node counts still default to 'knowledge_source' self.edge_facet_properties: Optional[List] = ["knowledge_source"] self.progress_monitor: Optional[ Callable[[GraphEntityType, List], None] ] = progress_monitor if error_log: MetaKnowledgeGraph.error_log = open(error_log, "w") # internal attributes # For Nodes... self.node_catalog: Dict[str, List[int]] = dict() self.node_stats: Dict[str, MetaKnowledgeGraph.Category] = dict() # We no longer track 'unknown' categories in meta-knowledge-graph # computations since such nodes are not TRAPI 1.1 compliant categories # self.node_stats['unknown'] = self.Category('unknown') # For Edges... self.edge_record_count: int = 0 self.predicates: Dict = dict() self.association_map: Dict = dict() self.edge_stats = [] # Overall graph statistics self.graph_stats: Dict[str, Dict] = dict()
[docs] def get_name(self) -> str: """ Returns ------- str Currently assigned knowledge graph name. """ return self.name
[docs] def __call__(self, entity_type: GraphEntityType, rec: List): """ Transformer 'inspector' Callable, for analysing a stream of graph data. Parameters ---------- entity_type: GraphEntityType indicates what kind of record being passed to the function for analysis. rec: Dict Complete data dictionary of the given record. """ if self.progress_monitor: self.progress_monitor(entity_type, rec) if entity_type == GraphEntityType.EDGE: self.analyse_edge(*rec) elif entity_type == GraphEntityType.NODE: self.analyse_node(*rec) else: raise RuntimeError("Unexpected GraphEntityType: " + str(entity_type))
@staticmethod def get_facet_counts(facets: Optional[List], counts_by_source: Dict, data: Dict): unknown: bool = True for facet in facets: if facet in data: unknown = False if isinstance(data[facet], str): facet_values = [data[facet]] else: # assume regular iterable facet_values = list(data[facet]) for s in facet_values: if facet not in counts_by_source: counts_by_source[facet] = dict() if s in counts_by_source[facet]: counts_by_source[facet][s] += 1 else: counts_by_source[facet][s] = 1 if unknown: if "unknown" in counts_by_source: counts_by_source["unknown"] += 1 else: counts_by_source["unknown"] = 1
[docs] class Category: """ Internal class for compiling statistics about a distinct category. """ # The 'category map' just associates a unique int catalog # index ('cid') value as a proxy for the full curie string, # to reduce storage in the main node catalog _category_curie_map: List[str] = list()
[docs] def __init__(self, category_curie: str, mkg): """ MetaKnowledgeGraph.Category constructor. category_curie: str Biolink Model category CURIE identifier. """ if not ( _category_curie_regexp.fullmatch(category_curie) or category_curie == "unknown" ): raise RuntimeError("Invalid Biolink category CURIE: " + category_curie) self.category_curie = category_curie self.mkg = mkg if self.category_curie not in self._category_curie_map: self._category_curie_map.append(self.category_curie) self.category_stats: Dict[str, Any] = dict() self.category_stats["id_prefixes"] = set() self.category_stats["count"] = 0 self.category_stats["count_by_source"] = dict()
[docs] def get_name(self) -> str: """ Returns ------- str CURIE name of the category. """ return self.category_curie
[docs] def get_cid(self): """ Returns ------- int Internal MetaKnowledgeGraph index id for tracking a Category. """ return self._category_curie_map.index(self.category_curie)
[docs] @classmethod def get_category_curie_from_index(cls, cid: int) -> str: """ Parameters ---------- cid: int Internal MetaKnowledgeGraph index id for tracking a Category. Returns ------- str Curie identifier of the Category. """ return cls._category_curie_map[cid]
[docs] def get_id_prefixes(self) -> Set[str]: """ Returns ------- Set[str] Set of identifier prefix (strings) used by nodes of this Category. """ return self.category_stats["id_prefixes"]
[docs] def get_count(self) -> int: """ Returns ------- int Count of nodes which have this category. """ return self.category_stats["count"]
[docs] def get_count_by_source( self, facet: str = "provided_by", source: str = None ) -> Dict[str, Any]: """ Parameters ---------- facet: str Facet tag (default, 'provided_by') from which the count should be returned source: str Source name about which the count is desired. Returns ------- Dict Count of nodes, by node 'provided_by' knowledge source, for a given category. Returns dictionary of all source counts, if input 'source' argument is not specified. """ if source and facet in self.category_stats["count_by_source"]: if source in self.category_stats["count_by_source"][facet]: return { source: self.category_stats["count_by_source"][facet][source] } else: return {source: 0} return self.category_stats["count_by_source"]
def _compile_prefix_stats(self, n: str): prefix = PrefixManager.get_prefix(n) if not prefix: print( f"Warning: node id {n} has no CURIE prefix", file=MetaKnowledgeGraph.error_log, ) else: if prefix not in self.category_stats["id_prefixes"]: self.category_stats["id_prefixes"].add(prefix) def _compile_category_source_stats(self, data: Dict): self.mkg.get_facet_counts( self.mkg.node_facet_properties, self.category_stats["count_by_source"], data, )
[docs] def analyse_node_category(self, n, data) -> None: """ Analyse metadata of a given graph node record of this category. Parameters ---------- n: str Curie identifier of the node record (not used here). data: Dict Complete data dictionary of node record fields. """ self.category_stats["count"] += 1 self._compile_prefix_stats(n) self._compile_category_source_stats(data)
[docs] def json_object(self): """ Returns ------- Dict[str, Any] Returns JSON friendly metadata for this category., """ return { "id_prefixes": list(self.category_stats["id_prefixes"]), "count": self.category_stats["count"], "count_by_source": self.category_stats["count_by_source"], }
[docs] def get_category(self, category_curie: str) -> Category: """ Counts the number of distinct (Biolink) categories encountered in the knowledge graph (not including those of 'unknown' category) Parameters ---------- category_curie: str Curie identifier for the (Biolink) category. Returns ------- Category MetaKnowledgeGraph.Category object for a given Biolink category. """ return self.node_stats[category_curie]
def _process_category_field(self, category_field: str, n: str, data: Dict): # we note here that category_curie *may be* # a piped '|' set of Biolink category CURIE values category_list = category_field.split("|") # analyse them each independently... for category_curie in category_list: if category_curie not in self.node_stats: try: self.node_stats[category_curie] = self.Category( category_curie, self ) except RuntimeError: _parse_warning("Invalid category CURIE", category_curie) continue category_record = self.node_stats[category_curie] category_idx: int = category_record.get_cid() if category_idx not in self.node_catalog[n]: self.node_catalog[n].append(category_idx) category_record.analyse_node_category(n, data)
[docs] def analyse_node(self, n: str, data: Dict) -> None: """ Analyse metadata of one graph node record. Parameters ---------- n: str Curie identifier of the node record (not used here). data: Dict Complete data dictionary of node record fields. """ # The TRAPI release 1.1 meta_knowledge_graph format indexes nodes by biolink:Category # the node 'category' field is a list of assigned categories (usually just one...). # However, this may perhaps sometimes result in duplicate counting and conflation of prefixes(?). if n in self.node_catalog: # Report duplications of node records, as discerned from node id. _parse_warning( "Duplicate node identifier", n, "encountered in input node data?" ) return else: self.node_catalog[n] = list() if "category" not in data or not data["category"]: # we now simply exclude nodes with missing categories from the count, since a category # of 'unknown' in the meta_knowledge_graph output is considered invalid. # category = self.node_stats['unknown'] # category.analyse_node_category(n, data) print( "Node with identifier '" + n + "' is missing its 'category' value? Ignoring in the analysis...", file=MetaKnowledgeGraph.error_log, ) return categories = data["category"] # analyse them each independently... for category_field in categories: self._process_category_field(category_field, n, data)
def _capture_predicate(self, data: Dict) -> Optional[str]: if "predicate" not in data: # We no longer track edges with 'unknown' predicates, # since those would not be TRAPI 1.1 JSON compliant... # self.predicates['unknown'] += 1 # predicate = "unknown" _parse_warning("Empty predicate CURIE in edge data", str(data)) self.edge_record_count -= 1 return None else: predicate = data["predicate"] if not _predicate_curie_regexp.fullmatch(predicate): _parse_warning("Invalid predicate CURIE", predicate) self.edge_record_count -= 1 return None if predicate not in self.predicates: # just need to track the number # of edge records using this predicate self.predicates[predicate] = 0 self.predicates[predicate] += 1 return predicate def _compile_triple_source_stats(self, triple: Tuple[str, str, str], data: Dict): self.get_facet_counts( self.edge_facet_properties, self.association_map[triple]["count_by_source"], data, ) def _process_triple( self, subject_category: str, predicate: str, object_category: str, data: Dict ): # Process the 'valid' S-P-O triple here... triple = (subject_category, predicate, object_category) if triple not in self.association_map: self.association_map[triple] = { "subject": triple[0], "predicate": triple[1], "object": triple[2], "relations": set(), "count_by_source": dict(), "count": 0, } if ( "relation" in data and data["relation"] not in self.association_map[triple]["relations"] ): self.association_map[triple]["relations"].add(data["relation"]) self.association_map[triple]["count"] += 1 self._compile_triple_source_stats(triple, data)
[docs] def analyse_edge(self, u, v, k, data) -> None: """ Analyse metadata of one graph edge record. Parameters ---------- u: str Subject node curie identifier of the edge. v: str Subject node curie identifier of the edge. k: str Key identifier of the edge record (not used here). data: Dict Complete data dictionary of edge record fields. """ # we blissfully assume that all the nodes of a # graph stream were analysed first by the MetaKnowledgeGraph # before the edges are analysed, thus we can test for # node 'n' existence internally, by identifier. # # Given the use case of multiple categories being assigned to a given node in a KGX data file, # either by category inheritance (ancestry all the way back up to NamedThing) # or by conflation (i.e. gene == protein id?), then the Cartesian product of # subject/object edges mappings need to be captured here. # self.edge_record_count += 1 predicate: str = self._capture_predicate(data) if not predicate: # relationship needs a predicate to process? return if u not in self.node_catalog: _parse_warning("Edge 'subject' node ID", u, "not found in node catalog") # removing from edge count self.edge_record_count -= 1 self.predicates[predicate] -= 1 return for subj_cat_idx in self.node_catalog[u]: subject_category: str = self.Category.get_category_curie_from_index( subj_cat_idx ) if v not in self.node_catalog: _parse_warning("Edge 'object' node ID", v, "not found in node catalog") self.edge_record_count -= 1 self.predicates[predicate] -= 1 return for obj_cat_idx in self.node_catalog[v]: object_category: str = self.Category.get_category_curie_from_index( obj_cat_idx ) self._process_triple(subject_category, predicate, object_category, data)
[docs] def get_number_of_categories(self) -> int: """ Counts the number of distinct (Biolink) categories encountered in the knowledge graph (not including those of 'unknown' category) Returns ------- int Number of distinct (Biolink) categories found in the graph (excluding nodes with 'unknown' category) """ # 'unknown' not tracked anymore... # return len([c for c in self.node_stats.keys() if c != 'unknown']) return len(self.node_stats.keys())
[docs] def get_node_stats(self) -> Dict[str, Category]: """ Returns ------- Dict[str, Category] Statistics for the nodes in the graph. """ # We no longer track 'unknown' node category counts - non TRAPI 1.1. compliant output # if 'unknown' in self.node_stats and not self.node_stats['unknown'].get_count(): # self.node_stats.pop('unknown') return self.node_stats
[docs] def get_edge_stats(self) -> List[Dict[str, Any]]: """ Returns ------- List[Dict[str, Any]] Knowledge map for the list of edges in the graph. """ # Not sure if this is "safe" but assume # that edge_stats may be cached once computed? if not self.edge_stats: for k, v in self.association_map.items(): kedge = v relations = list(v["relations"]) kedge["relations"] = relations self.edge_stats.append(kedge) return self.edge_stats
[docs] def get_total_nodes_count(self) -> int: """ Counts the total number of distinct nodes in the knowledge graph (**not** including those ignored due to being of 'unknown' category) Returns ------- int Number of distinct nodes in the knowledge. """ return len(self.node_catalog)
[docs] def get_node_count_by_category(self, category_curie: str) -> int: """ Counts the number of edges in the graph with the specified (Biolink) category curie. Parameters ---------- category_curie: str Curie identifier for the (Biolink) category. Returns ------- int Number of nodes for the given category. Raises ------ RuntimeError Error if category identifier is empty string or None. """ if not category_curie: raise RuntimeError( "get_node_count_by_category(): null or empty category argument!?" ) if category_curie in self.node_stats.keys(): return self.node_stats[category_curie].get_count() else: return 0
[docs] def get_total_node_counts_across_categories(self) -> int: """ The aggregate count of all node to category mappings for every category. Note that nodes with multiple categories will have their count replicated under each of its categories. Returns ------- int Total count of node to category mappings for the graph. """ count = 0 for category in self.node_stats.values(): count += category.get_count() return count
[docs] def get_total_edges_count(self) -> int: """ Gets the total number of 'valid' edges in the data set (ignoring those with 'unknown' subject or predicate category mappings) Returns ---------- int Total count of edges in the graph. """ return self.edge_record_count
[docs] def get_edge_mapping_count(self) -> int: """ Counts the number of distinct edge Subject (category) - P (predicate) -> Object (category) mappings in the knowledge graph. Returns ---------- int Count of subject(category) - predicate -> object(category) mappings in the graph. """ return len(self.get_edge_stats())
[docs] def get_predicate_count(self) -> int: """ Counts the number of distinct edge predicates in the knowledge graph. Returns ---------- int Number of distinct (Biolink) predicates in the graph. """ return len(self.predicates)
[docs] def get_edge_count_by_predicate(self, predicate_curie: str) -> int: """ Counts the number of edges in the graph with the specified predicate. Parameters ---------- predicate_curie: str (Biolink) curie identifier for the predicate. Returns ------- int Number of edges for the given predicate. Raises ------ RuntimeError Error if predicate identifier is empty string or None. """ if not predicate_curie: raise RuntimeError( "get_node_count_by_category(): null or empty predicate argument!?" ) if predicate_curie in self.predicates: return self.predicates[predicate_curie] return 0
[docs] def get_total_edge_counts_across_mappings(self) -> int: """ Aggregate count of the edges in the graph for every mapping. Edges with subject and object nodes with multiple assigned categories will have their count replicated under each distinct mapping of its categories. Returns ------- int Number of the edges counted across all mappings. """ count = 0 for edge in self.get_edge_stats(): count += edge["count"] return count
[docs] def get_edge_count_by_source( self, subject_category: str, predicate: str, object_category: str, facet: str = "knowledge_source", source: Optional[str] = None, ) -> Dict[str, Any]: """ Returns count by source for one S-P-O triple (S, O being Biolink categories; P, a Biolink predicate) """ if not (subject_category and predicate and object_category): print( "Warning: get_edge_count_by_source() has some empty S-P-O arguments? Don't know what you are counting!", file=MetaKnowledgeGraph.error_log, ) return dict() triple = (subject_category, predicate, object_category) if ( triple in self.association_map and "count_by_source" in self.association_map[triple] ): if facet in self.association_map[triple]["count_by_source"]: if source: if source in self.association_map[triple]["count_by_source"][facet]: return self.association_map[triple]["count_by_source"][facet][ source ] else: return dict() else: return self.association_map[triple]["count_by_source"][facet] else: return dict() else: print( "Warning: get_edge_count_by_source(): unknown triple " + "(" + subject_category + "," + predicate + "," + object_category + ")?", file=MetaKnowledgeGraph.error_log, ) return dict()
[docs] def summarize_graph_nodes(self, graph: BaseGraph) -> Dict: """ Summarize the nodes in a graph. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph The graph Returns ------- Dict The node stats """ for n, data in graph.nodes(data=True): self.analyse_node(n, data) return self.get_node_stats()
[docs] def summarize_graph_edges(self, graph: BaseGraph) -> List[Dict]: """ Summarize the edges in a graph. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph The graph Returns ------- List[Dict] The edge stats """ for u, v, k, data in graph.edges(keys=True, data=True): self.analyse_edge(u, v, k, data) return self.get_edge_stats()
[docs] def summarize_graph(self, graph: BaseGraph, name: str = None, **kwargs) -> Dict: """ Generate a meta knowledge graph that describes the composition of the graph. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph The graph name: Optional[str] Name for the graph kwargs: Dict Any additional arguments (ignored in this method at present) Returns ------- Dict A TRAPI 1.1 compliant meta knowledge graph of the knowledge graph returned as a dictionary. """ if not self.graph_stats: node_stats = self.summarize_graph_nodes(graph) edge_stats = self.summarize_graph_edges(graph) # JSON sent back as TRAPI 1.1 version, # without the global 'knowledge_map' object tag self.graph_stats = {"nodes": node_stats, "edges": edge_stats} if name: self.graph_stats["name"] = name else: self.graph_stats["name"] = self.name return self.graph_stats
[docs] def get_graph_summary(self, name: str = None, **kwargs) -> Dict: """ Similar to summarize_graph except that the node and edge statistics are already captured in the MetaKnowledgeGraph class instance (perhaps by Transformer.process() stream inspection) and therefore, the data structure simply needs to be 'finalized' for saving or similar use. Parameters ---------- name: Optional[str] Name for the graph (if being renamed) kwargs: Dict Any additional arguments (ignored in this method at present) Returns ------- Dict A TRAPI 1.1 compliant meta knowledge graph of the knowledge graph returned as a dictionary. """ if not self.graph_stats: # JSON sent back as TRAPI 1.1 version, # without the global 'knowledge_map' object tag self.graph_stats = { "nodes": self.get_node_stats(), "edges": self.get_edge_stats(), } if name: self.graph_stats["name"] = name else: self.graph_stats["name"] = self.name return self.graph_stats
[docs] def save(self, file, name: str = None, file_format: str = "json") -> None: """ Save the current MetaKnowledgeGraph to a specified (open) file (device). Parameters ---------- file: File Text file handler open for writing. name: str Optional string to which to (re-)name the graph. file_format: str Text output format ('json' or 'yaml') for the saved meta knowledge graph (default: 'json') Returns ------- None """ stats = self.get_graph_summary(name) if not file_format or file_format == "json": dump(stats, file, indent=4, default=mkg_default) else: yaml.dump(stats, file)
[docs]def generate_meta_knowledge_graph(graph: BaseGraph, name: str, filename: str) -> None: """ Generate a knowledge map that describes the composition of the graph and write to ``filename``. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph The graph name: Optional[str] Name for the graph filename: str The file to write the knowledge map to """ graph_stats = summarize_graph(graph, name) with open(filename, mode="w") as mkgh: dump(graph_stats, mkgh, indent=4, default=mkg_default)
[docs]def summarize_graph(graph: BaseGraph, name: str = None, **kwargs) -> Dict: """ Generate a meta knowledge graph that describes the composition of the graph. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph The graph name: Optional[str] Name for the graph kwargs: Dict Any additional arguments Returns ------- Dict A TRAPI 1.1 compliant meta knowledge graph of the knowledge graph returned as a dictionary. """ mkg = MetaKnowledgeGraph(name) return mkg.summarize_graph(graph)