from typing import Dict, List, Optional, Any, Callable, Set
from sys import stderr
import re
import yaml
from json import dump
from json.encoder import JSONEncoder
from kgx.utils.kgx_utils import GraphEntityType
from kgx.graph.base_graph import BaseGraph
from kgx.prefix_manager import PrefixManager
TOTAL_NODES = "total_nodes"
NODE_CATEGORIES = "node_categories"
NODE_ID_PREFIXES_BY_CATEGORY = "node_id_prefixes_by_category"
NODE_ID_PREFIXES = "node_id_prefixes"
COUNT_BY_CATEGORY = "count_by_category"
COUNT_BY_ID_PREFIXES_BY_CATEGORY = "count_by_id_prefixes_by_category"
COUNT_BY_ID_PREFIXES = "count_by_id_prefixes"
TOTAL_EDGES = "total_edges"
EDGE_PREDICATES = "predicates"
COUNT_BY_EDGE_PREDICATES = "count_by_predicates"
COUNT_BY_SPO = "count_by_spo"
# Note: the format of the stats generated might change in the future
####################################################################################
# New "Inspector Class" design pattern for KGX stream data processing
####################################################################################
[docs]def gs_default(o):
"""
JSONEncoder 'default' function override to
properly serialize 'Set' objects (into 'List')
"""
if isinstance(o, GraphSummary.Category):
return o.json_object()
else:
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return JSONEncoder().default(o)
_category_curie_regexp = re.compile("^biolink:[A-Z][a-zA-Z]*$")
_predicate_curie_regexp = re.compile("^biolink:[a-z][a-z_]*$")
# DRY parse warning message
def _parse_warning(prefix: str, item: str, suffix: str = None):
print(
prefix + " '" + item + "'" + (" " + suffix if suffix else "") + "? Ignoring...",
file=GraphSummary.error_log,
)
[docs]class GraphSummary:
"""
Class for generating a "classical" knowledge graph summary.
The optional 'progress_monitor' for the validator should be a lightweight Callable
which is injected into the class 'inspector' Callable, designed to intercepts
node and edge records streaming through the Validator (inside a Transformer.process() call.
The first (GraphEntityType) argument of the Callable tags the record as a NODE or an EDGE.
The second argument given to the Callable is the current record itself.
This Callable is strictly meant to be procedural and should *not* mutate the record.
The intent of this Callable is to provide a hook to KGX applications wanting the
namesake function of passively monitoring the graph data stream. As such, the Callable
could simply tally up the number of times it is called with a NODE or an EDGE, then
provide a suitable (quick!) report of that count back to the KGX application. The
Callable (function/callable class) should not modify the record and should be of low
complexity, so as not to introduce a large computational overhead to validation!
"""
error_log = stderr
[docs] def __init__(
self,
name="",
node_facet_properties: Optional[List] = None,
edge_facet_properties: Optional[List] = None,
progress_monitor: Optional[Callable[[GraphEntityType, List], None]] = None,
error_log: str = None,
**kwargs,
):
"""
GraphSummary constructor.
Parameters
----------
name: str
(Graph) name assigned to the summary.
node_facet_properties: Optional[List]
A list of properties to facet on. For example, ``['provided_by']``
edge_facet_properties: Optional[List]
A list of properties to facet on. For example, ``['knowledge_source']``
progress_monitor: Optional[Callable[[GraphEntityType, List], None]]
Function given a peek at the current record being stream processed by the class wrapped Callable.
error_log: str
Where to write any graph processing error message (stderr, by default)
"""
# formal arguments
self.name = name
self.nodes_processed = False
self.node_stats: Dict = {
TOTAL_NODES: 0,
NODE_CATEGORIES: set(),
NODE_ID_PREFIXES: set(),
NODE_ID_PREFIXES_BY_CATEGORY: dict(),
COUNT_BY_CATEGORY: dict(),
COUNT_BY_ID_PREFIXES_BY_CATEGORY: dict(),
COUNT_BY_ID_PREFIXES: dict(),
}
self.edges_processed: bool = False
self.edge_stats: Dict = {
TOTAL_EDGES: 0,
EDGE_PREDICATES: set(),
COUNT_BY_EDGE_PREDICATES: {"unknown": {"count": 0}},
COUNT_BY_SPO: {},
}
self.node_facet_properties: Optional[List] = node_facet_properties
if self.node_facet_properties:
for facet_property in self.node_facet_properties:
self.add_node_stat(facet_property, set())
self.edge_facet_properties: Optional[List] = edge_facet_properties
if self.edge_facet_properties:
for facet_property in self.edge_facet_properties:
self.edge_stats[facet_property] = set()
self.progress_monitor: Optional[
Callable[[GraphEntityType, List], None]
] = progress_monitor
if error_log:
GraphSummary.error_log = open(error_log, mode="w")
# internal attributes
self.node_catalog: Dict[str, List[int]] = dict()
self.node_categories: Dict[str, GraphSummary.Category] = dict()
# indexed internally with category index id '0'
self.node_categories["unknown"] = GraphSummary.Category("unknown", self)
self.graph_stats: Dict[str, Dict] = dict()
[docs] def get_name(self):
"""
Returns
-------
str
Currently assigned knowledge graph name.
"""
return self.name
[docs] def __call__(self, entity_type: GraphEntityType, rec: List):
"""
Transformer 'inspector' Callable, for analysing a stream of graph data.
Parameters
----------
entity_type: GraphEntityType
indicates what kind of record being passed to the function for analysis.
rec: Dict
Complete data dictionary of the given record.
"""
if self.progress_monitor:
self.progress_monitor(entity_type, rec)
if entity_type == GraphEntityType.EDGE:
self.analyse_edge(*rec)
elif entity_type == GraphEntityType.NODE:
self.analyse_node(*rec)
else:
raise RuntimeError("Unexpected GraphEntityType: " + str(entity_type))
[docs] class Category:
"""
Internal class for compiling statistics about a distinct category.
"""
# The 'category map' just associates a unique int catalog
# index ('cid') value as a proxy for the full curie string,
# to reduce storage in the main node catalog
_category_curie_map: List[str] = list()
[docs] def __init__(self, category_curie: str, summary):
"""
GraphSummary.Category constructor.
category: str
Biolink Model category curie identifier.
"""
if not (
_category_curie_regexp.fullmatch(category_curie)
or category_curie == "unknown"
):
raise RuntimeError("Invalid Biolink category CURIE: " + category_curie)
# generally, a Biolink category class CURIE but also 'unknown'
self.category_curie = category_curie
# it is useful to point to the GraphSummary within
# which this Category metadata is bring tracked...
self.summary = summary
# ...so that Category related entries at that
# higher level may be properly initialized
# for subsequent facet metadata access
if not self.category_curie == "unknown":
self.summary.node_stats[NODE_CATEGORIES].add(self.category_curie)
self.summary.node_stats[NODE_ID_PREFIXES_BY_CATEGORY][
self.category_curie
] = set()
self.summary.node_stats[COUNT_BY_CATEGORY][self.category_curie] = {
"count": 0
}
if self.category_curie not in self._category_curie_map:
self._category_curie_map.append(self.category_curie)
self.category_stats: Dict[str, Any] = dict()
self.category_stats["count"]: int = 0
self.category_stats["count_by_source"]: Dict[str, int] = {"unknown": 0}
self.category_stats["count_by_id_prefix"]: Dict[str, int] = dict()
[docs] def get_name(self) -> str:
"""
Returns
-------
str
Biolink CURIE name of the category.
"""
return self.category_curie
[docs] def get_cid(self) -> int:
"""
Returns
-------
int
Internal GraphSummary index id for tracking a Category.
"""
return self._category_curie_map.index(self.category_curie)
@classmethod
def _get_category_curie_by_index(cls, cid: int) -> str:
"""
Parameters
----------
cid: int
Internal GraphSummary index id for tracking a Category.
Returns
-------
str
Curie identifier of the Category.
"""
return cls._category_curie_map[cid]
[docs] def get_id_prefixes(self) -> Set:
"""
Returns
-------
Set[str]
Set of identifier prefix (strings) used by nodes of this Category.
"""
return set(self.category_stats["count_by_id_prefix"].keys())
[docs] def get_count_by_id_prefixes(self):
"""
Returns
-------
int
Count of nodes by id_prefixes for nodes which have this category.
"""
return self.category_stats["count_by_id_prefix"]
[docs] def get_count(self):
"""
Returns
-------
int
Count of nodes which have this category.
"""
return self.category_stats["count"]
def _capture_prefix(self, n: str):
prefix = PrefixManager.get_prefix(n)
if not prefix:
print(
f"Warning: node id {n} has no CURIE prefix",
file=GraphSummary.error_log,
)
else:
if prefix in self.category_stats["count_by_id_prefix"]:
self.category_stats["count_by_id_prefix"][prefix] += 1
else:
self.category_stats["count_by_id_prefix"][prefix] = 1
def _capture_knowledge_source(self, data: Dict):
if "provided_by" in data:
for s in data["provided_by"]:
if s in self.category_stats["count_by_source"]:
self.category_stats["count_by_source"][s] += 1
else:
self.category_stats["count_by_source"][s] = 1
else:
self.category_stats["count_by_source"]["unknown"] += 1
[docs] def analyse_node_category(self, summary, n, data):
"""
Analyse metadata of a given graph node record of this category.
Parameters
----------
summary: GraphSummary
GraphSunmmary within which the Category is being analysed.
n: str
Curie identifier of the node record (not used here).
data: Dict
Complete data dictionary of node record fields.
"""
self.category_stats["count"] += 1
self._capture_prefix(n)
self._capture_knowledge_source(data)
if summary.node_facet_properties:
for facet_property in summary.node_facet_properties:
summary.node_stats = summary.get_facet_counts(
data,
summary.node_stats,
COUNT_BY_CATEGORY,
self.category_curie,
facet_property,
)
[docs] def json_object(self):
"""
Returns
-------
Dict[str, Any]
Returns JSON friendly metadata for this category.,
"""
return {
"id_prefixes": list(self.category_stats["count_by_id_prefix"].keys()),
"count": self.category_stats["count"],
"count_by_source": self.category_stats["count_by_source"],
"count_by_id_prefix": self.category_stats["count_by_id_prefix"],
}
[docs] def get_category(self, category_curie: str) -> Category:
"""
Counts the number of distinct (Biolink) categories encountered
in the knowledge graph (not including those of 'unknown' category)
Parameters
----------
category_curie: str
Curie identifier for the (Biolink) category.
Returns
-------
Category
MetaKnowledgeGraph.Category object for a given Biolink category.
"""
return self.node_stats[category_curie]
def _process_category_field(self, category_field: str, n: str, data: Dict):
# we note here that category_curie *may be*
# a piped '|' set of Biolink category CURIE values
category_list = category_field.split("|")
# analyse them each independently...
for category_curie in category_list:
if category_curie not in self.node_categories:
try:
self.node_categories[category_curie] = self.Category(
category_curie, self
)
except RuntimeError:
_parse_warning("Invalid category CURIE", category_curie)
continue
category_record = self.node_categories[category_curie]
category_idx: int = category_record.get_cid()
if category_idx not in self.node_catalog[n]:
self.node_catalog[n].append(category_idx)
category_record.analyse_node_category(self, n, data)
#
# Moved this computation from the 'analyse_node_category() method above
#
# if self.node_facet_properties:
# for facet_property in self.node_facet_properties:
# self.node_stats = self.get_facet_counts(
# data, self.node_stats, COUNT_BY_CATEGORY, category_curie, facet_property
# )
[docs] def analyse_node(self, n, data):
"""
Analyse metadata of one graph node record.
Parameters
----------
n: str
Curie identifier of the node record (not used here).
data: Dict
Complete data dictionary of node record fields.
"""
if n in self.node_catalog:
# Report duplications of node records, as discerned from node id.
_parse_warning(
"Duplicate node identifier", n, "encountered in input node data"
)
return
else:
self.node_catalog[n] = list()
if "category" in data and data["category"]:
categories = data["category"]
else:
categories = ["unknown"]
print(
"Node with identifier '"
+ n
+ "' is missing its 'category' value? Tagging it as 'unknown'",
file=GraphSummary.error_log,
)
# analyse them each independently...
for category_field in categories:
self._process_category_field(category_field, n, data)
def _capture_predicate(self, data: Dict) -> Optional[str]:
if "predicate" not in data:
self.edge_stats[COUNT_BY_EDGE_PREDICATES]["unknown"]["count"] += 1
predicate = "unknown"
else:
predicate = data["predicate"]
if not _predicate_curie_regexp.fullmatch(predicate):
_parse_warning("Invalid predicate CURIE", predicate)
return None
self.edge_stats[EDGE_PREDICATES].add(predicate)
if predicate in self.edge_stats[COUNT_BY_EDGE_PREDICATES]:
self.edge_stats[COUNT_BY_EDGE_PREDICATES][predicate]["count"] += 1
else:
self.edge_stats[COUNT_BY_EDGE_PREDICATES][predicate] = {"count": 1}
if self.edge_facet_properties:
for facet_property in self.edge_facet_properties:
self.edge_stats = self.get_facet_counts(
data,
self.edge_stats,
COUNT_BY_EDGE_PREDICATES,
predicate,
facet_property,
)
return predicate
def _process_triple(
self, subject_category: str, predicate: str, object_category: str, data: Dict
):
# Process the 'valid' S-P-O triple here...
key = f"{subject_category}-{predicate}-{object_category}"
if key in self.edge_stats[COUNT_BY_SPO]:
self.edge_stats[COUNT_BY_SPO][key]["count"] += 1
else:
self.edge_stats[COUNT_BY_SPO][key] = {"count": 1}
if self.edge_facet_properties:
for facet_property in self.edge_facet_properties:
self.edge_stats = self.get_facet_counts(
data, self.edge_stats, COUNT_BY_SPO, key, facet_property
)
[docs] def analyse_edge(self, u: str, v: str, k: str, data: Dict):
"""
Analyse metadata of one graph edge record.
Parameters
----------
u: str
Subject node curie identifier of the edge.
v: str
Subject node curie identifier of the edge.
k: str
Key identifier of the edge record (not used here).
data: Dict
Complete data dictionary of edge record fields.
"""
# we blissfully now assume that all the nodes of a
# graph stream were analysed first by the GraphSummary
# before the edges are analysed, thus we can test for
# node 'n' existence internally, by identifier.
self.edge_stats[TOTAL_EDGES] += 1
predicate: str = self._capture_predicate(data)
if u not in self.node_catalog:
_parse_warning("Edge 'subject' node ID", u, "not found in node catalog")
# removing from edge count
self.edge_stats[TOTAL_EDGES] -= 1
self.edge_stats[COUNT_BY_EDGE_PREDICATES]["unknown"]["count"] -= 1
return
for subj_cat_idx in self.node_catalog[u]:
subject_category = self.Category._get_category_curie_by_index(subj_cat_idx)
if v not in self.node_catalog:
_parse_warning("Edge 'object' node ID", v, "not found in node catalog")
self.edge_stats[TOTAL_EDGES] -= 1
self.edge_stats[COUNT_BY_EDGE_PREDICATES]["unknown"]["count"] -= 1
return
for obj_cat_idx in self.node_catalog[v]:
object_category = self.Category._get_category_curie_by_index(
obj_cat_idx
)
self._process_triple(subject_category, predicate, object_category, data)
def _compile_prefix_stats_by_category(self, category_curie: str):
for prefix in self.node_stats[COUNT_BY_ID_PREFIXES_BY_CATEGORY][category_curie]:
if prefix not in self.node_stats[COUNT_BY_ID_PREFIXES]:
self.node_stats[COUNT_BY_ID_PREFIXES][prefix] = 0
self.node_stats[COUNT_BY_ID_PREFIXES][prefix] += self.node_stats[
COUNT_BY_ID_PREFIXES_BY_CATEGORY
][category_curie][prefix]
def _compile_category_stats(self, node_category: Category):
category_curie = node_category.get_name()
self.node_stats[COUNT_BY_CATEGORY][category_curie][
"count"
] = node_category.get_count()
id_prefixes = node_category.get_id_prefixes()
self.node_stats[NODE_ID_PREFIXES_BY_CATEGORY][category_curie] = id_prefixes
self.node_stats[NODE_ID_PREFIXES].update(id_prefixes)
self.node_stats[COUNT_BY_ID_PREFIXES_BY_CATEGORY][
category_curie
] = node_category.get_count_by_id_prefixes()
self._compile_prefix_stats_by_category(category_curie)
[docs] def get_node_stats(self) -> Dict[str, Any]:
"""
Returns
-------
Dict[str, Any]
Statistics for the nodes in the graph.
"""
if not self.nodes_processed:
self.nodes_processed = True
for node_category in self.node_categories.values():
self._compile_category_stats(node_category)
self.node_stats[NODE_CATEGORIES] = sorted(
list(self.node_stats[NODE_CATEGORIES])
)
if self.node_facet_properties:
for facet_property in self.node_facet_properties:
self.node_stats[facet_property] = sorted(
list(self.node_stats[facet_property])
)
if not self.node_stats[TOTAL_NODES]:
self.node_stats[TOTAL_NODES] = len(self.node_catalog)
return self.node_stats
[docs] def add_node_stat(self, tag: str, value: Any):
"""
Compile/add a nodes statistic for a given tag = value annotation of the node.
:param tag:
:param value:
:return:
Parameters
----------
tag: str
Tag label for the annotation.
value: Any
Value of the specific tag annotation.
"""
self.node_stats[tag] = value
def get_edge_stats(self) -> Dict[str, Any]:
# Not sure if this is "safe" but assume that edge_stats may be finalized
# and cached once after the first time the edge stats are accessed
if not self.edges_processed:
self.edges_processed = True
self.edge_stats[EDGE_PREDICATES] = sorted(
list(self.edge_stats[EDGE_PREDICATES])
)
if self.edge_facet_properties:
for facet_property in self.edge_facet_properties:
self.edge_stats[facet_property] = sorted(
list(self.edge_stats[facet_property])
)
return self.edge_stats
def _wrap_graph_stats(
self,
graph_name: str,
node_stats: Dict[str, Any],
edge_stats: Dict[str, Any],
):
# Utility wrapper function to support DRY code below.
if not self.graph_stats:
self.graph_stats = {
"graph_name": graph_name,
"node_stats": node_stats,
"edge_stats": edge_stats,
}
return self.graph_stats
[docs] def get_graph_summary(self, name: str = None, **kwargs) -> Dict:
"""
Similar to summarize_graph except that the node and edge statistics are already captured
in the GraphSummary class instance (perhaps by Transformer.process() stream inspection)
and therefore, the data structure simply needs to be 'finalized' for saving or similar use.
Parameters
----------
name: Optional[str]
Name for the graph (if being renamed)
kwargs: Dict
Any additional arguments (ignored in this method at present)
Returns
-------
Dict
A knowledge map dictionary corresponding to the graph
"""
return self._wrap_graph_stats(
graph_name=name if name else self.name,
node_stats=self.get_node_stats(),
edge_stats=self.get_edge_stats(),
)
[docs] def summarize_graph(self, graph: BaseGraph) -> Dict:
"""
Summarize the entire graph.
Parameters
----------
graph: kgx.graph.base_graph.BaseGraph
The graph
Returns
-------
Dict
The stats dictionary
"""
return self._wrap_graph_stats(
graph_name=self.name if self.name else graph.name,
node_stats=self.summarize_graph_nodes(graph),
edge_stats=self.summarize_graph_edges(graph),
)
[docs] def summarize_graph_nodes(self, graph: BaseGraph) -> Dict:
"""
Summarize the nodes in a graph.
Parameters
----------
graph: kgx.graph.base_graph.BaseGraph
The graph
Returns
-------
Dict
The node stats
"""
for n, data in graph.nodes(data=True):
self.analyse_node(n, data)
return self.get_node_stats()
[docs] def summarize_graph_edges(self, graph: BaseGraph) -> Dict:
"""
Summarize the edges in a graph.
Parameters
----------
graph: kgx.graph.base_graph.BaseGraph
The graph
Returns
-------
Dict
The edge stats
"""
for u, v, k, data in graph.edges(keys=True, data=True):
self.analyse_edge(u, v, k, data)
return self.get_edge_stats()
def _compile_facet_stats(
self, stats: Dict, x: str, y: str, facet_property: str, value: str
):
if facet_property not in stats[x][y]:
stats[x][y][facet_property] = {}
if value in stats[x][y][facet_property]:
stats[x][y][facet_property][value]["count"] += 1
else:
stats[x][y][facet_property][value] = {"count": 1}
stats[facet_property].update([value])
[docs] def get_facet_counts(
self, data: Dict, stats: Dict, x: str, y: str, facet_property: str
) -> Dict:
"""
Facet on ``facet_property`` and record the count for ``stats[x][y][facet_property]``.
Parameters
----------
data: dict
Node/edge data dictionary
stats: dict
The stats dictionary
x: str
first key
y: str
second key
facet_property: str
The property to facet on
Returns
-------
Dict
The stats dictionary
"""
if facet_property in data:
if isinstance(data[facet_property], list):
for k in data[facet_property]:
self._compile_facet_stats(stats, x, y, facet_property, k)
else:
k = data[facet_property]
self._compile_facet_stats(stats, x, y, facet_property, k)
else:
self._compile_facet_stats(stats, x, y, facet_property, "unknown")
return stats
[docs] def save(self, file, name: str = None, file_format: str = "yaml"):
"""
Save the current GraphSummary to a specified (open) file (device).
Parameters
----------
file: File
Text file handler open for writing.
name: str
Optional string to which to (re-)name the graph.
file_format: str
Text output format ('json' or 'yaml') for the saved meta knowledge graph (default: 'json')
Returns
-------
None
"""
stats = self.get_graph_summary(name)
if not file_format or file_format == "yaml":
yaml.dump(stats, file)
else:
dump(stats, file, indent=4, default=gs_default)
[docs]def generate_graph_stats(
graph: BaseGraph,
graph_name: str,
filename: str,
node_facet_properties: Optional[List] = None,
edge_facet_properties: Optional[List] = None,
) -> None:
"""
Generate stats from Graph.
Parameters
----------
graph: kgx.graph.base_graph.BaseGraph
The graph
graph_name: str
Name for the graph
filename: str
Filename to write the stats to
node_facet_properties: Optional[List]
A list of properties to facet on. For example, ``['provided_by']``
edge_facet_properties: Optional[List]
A list of properties to facet on. For example, ``['knowledge_source']``
"""
stats = summarize_graph(
graph, graph_name, node_facet_properties, edge_facet_properties
)
with open(filename, "w") as gsh:
yaml.dump(stats, gsh)
[docs]def summarize_graph(
graph: BaseGraph,
name: str = None,
node_facet_properties: Optional[List] = None,
edge_facet_properties: Optional[List] = None,
) -> Dict:
"""
Summarize the entire graph.
Parameters
----------
graph: kgx.graph.base_graph.BaseGraph
The graph
name: str
Name for the graph
node_facet_properties: Optional[List]
A list of properties to facet on. For example, ``['provided_by']``
edge_facet_properties: Optional[List]
A list of properties to facet on. For example, ``['knowledge_source']``
Returns
-------
Dict
The stats dictionary
"""
gs = GraphSummary(name, node_facet_properties, edge_facet_properties)
return gs.summarize_graph(graph)