import gzip
from itertools import chain
from typing import Optional, Dict, Generator, Any
import ijson
import stringcase
from bmt import Toolkit
from kgx.prefix_manager import PrefixManager
from kgx.config import get_logger
from kgx.source.json_source import JsonSource
from kgx.utils.kgx_utils import get_biolink_element, format_biolink_slots
log = get_logger()
[docs]class ObographSource(JsonSource):
"""
ObographSource is responsible for reading data as records
from an OBO Graph JSON.
"""
HAS_OBO_NAMESPACE = "http://www.geneontology.org/formats/oboInOwl#hasOBONamespace"
SKOS_EXACT_MATCH = "http://www.w3.org/2004/02/skos/core#exactMatch"
def __init__(self):
super().__init__()
self.toolkit = Toolkit()
self.ecache: Dict = {}
[docs] def parse(
self,
filename: str,
format: str = "json",
compression: Optional[str] = None,
**kwargs: Any,
) -> Generator:
"""
This method reads from JSON and yields records.
Parameters
----------
filename: str
The filename to parse
format: str
The format (``json``)
compression: Optional[str]
The compression type (``gz``)
kwargs: Any
Any additional arguments
Returns
-------
Generator
A generator for records
"""
self.set_provenance_map(kwargs)
n = self.read_nodes(filename, compression)
e = self.read_edges(filename, compression)
yield from chain(n, e)
[docs] def read_nodes(self, filename: str, compression: Optional[str] = None) -> Generator:
"""
Read node records from a JSON.
Parameters
----------
filename: str
The filename to read from
compression: Optional[str]
The compression type
Returns
-------
Generator
A generator for node records
"""
if compression and compression == "gz":
FH = gzip.open(filename, "rb")
else:
FH = open(filename, "rb")
for n in ijson.items(FH, "graphs.item.nodes.item"):
yield self.read_node(n)
[docs] def read_node(self, node: Dict) -> Dict:
"""
Read and parse a node record.
Parameters
----------
node: Dict
The node record
Returns
-------
Dict
The processed node
"""
curie = self.prefix_manager.contract(node["id"])
node_properties = {}
if "meta" in node:
node_properties = self.parse_meta(node["id"], node["meta"])
fixed_node = dict()
fixed_node["id"] = curie
if "lbl" in node:
fixed_node["name"] = node["lbl"]
fixed_node["iri"] = node["id"]
if "description" in node_properties:
fixed_node["description"] = node_properties["description"]
if "synonym" in node_properties:
fixed_node["synonym"] = node_properties["synonym"]
if "xrefs" in node_properties:
fixed_node["xref"] = node_properties["xrefs"]
if "subsets" in node_properties:
fixed_node["subsets"] = node_properties["subsets"]
if "category" not in node:
category = self.get_category(curie, node)
if category:
fixed_node["category"] = [category]
else:
fixed_node["category"] = ["biolink:OntologyClass"]
if "equivalent_nodes" in node_properties:
equivalent_nodes = node_properties["equivalent_nodes"]
fixed_node["same_as"] = equivalent_nodes
# for n in node_properties['equivalent_nodes']:
# data = {'subject': fixed_node['id'], 'predicate': 'biolink:same_as',
# 'object': n, 'relation': 'owl:sameAs'}
# super().load_node({'id': n, 'category': ['biolink:OntologyClass']})
# self.graph.add_edge(fixed_node['id'], n, **data)
return super().read_node(fixed_node)
[docs] def read_edges(self, filename: str, compression: Optional[str] = None) -> Generator:
"""
Read edge records from a JSON.
Parameters
----------
filename: str
The filename to read from
compression: Optional[str]
The compression type
Returns
-------
Generator
A generator for edge records
"""
if compression == "gz":
FH = gzip.open(filename, "rb")
else:
FH = open(filename, "rb")
for e in ijson.items(FH, "graphs.item.edges.item"):
yield self.read_edge(e)
[docs] def read_edge(self, edge: Dict) -> Dict:
"""
Read and parse an edge record.
Parameters
----------
edge: Dict
The edge record
Returns
-------
Dict
The processed edge
"""
fixed_edge = dict()
fixed_edge["subject"] = self.prefix_manager.contract(edge["sub"])
if PrefixManager.is_iri(edge["pred"]):
curie = self.prefix_manager.contract(edge["pred"])
if curie in self.ecache:
edge_predicate = self.ecache[curie]
else:
element = get_biolink_element(curie)
if not element:
try:
mapping = self.toolkit.get_element_by_mapping(edge["pred"])
if mapping:
element = self.toolkit.get_element(mapping)
except ValueError as e:
log.error(e)
if element:
edge_predicate = format_biolink_slots(element.name.replace(",", ""))
fixed_edge["predicate"] = edge_predicate
else:
edge_predicate = "biolink:related_to"
self.ecache[curie] = edge_predicate
fixed_edge["predicate"] = edge_predicate
fixed_edge["relation"] = curie
else:
if edge["pred"] == "is_a":
fixed_edge["predicate"] = "biolink:subclass_of"
fixed_edge["relation"] = "rdfs:subClassOf"
elif edge["pred"] == "has_part":
fixed_edge["predicate"] = "biolink:has_part"
fixed_edge["relation"] = "BFO:0000051"
elif edge["pred"] == "part_of":
fixed_edge["predicate"] = "biolink:part_of"
fixed_edge["relation"] = "BFO:0000050"
else:
fixed_edge["predicate"] = f"biolink:{edge['pred'].replace(' ', '_')}"
fixed_edge["relation"] = edge["pred"]
fixed_edge["object"] = self.prefix_manager.contract(edge["obj"])
for x in edge.keys():
if x not in {"sub", "pred", "obj"}:
fixed_edge[x] = edge[x]
return super().read_edge(fixed_edge)
[docs] def get_category(self, curie: str, node: dict) -> Optional[str]:
"""
Get category for a given CURIE.
Parameters
----------
curie: str
Curie for node
node: dict
Node data
Returns
-------
Optional[str]
Category for the given node CURIE.
"""
category = None
# use meta.basicPropertyValues
if "meta" in node and "basicPropertyValues" in node["meta"]:
for p in node["meta"]["basicPropertyValues"]:
if p["pred"] == self.HAS_OBO_NAMESPACE:
category = p["val"]
element = self.toolkit.get_element(category)
if element:
category = f"biolink:{stringcase.pascalcase(stringcase.snakecase(element.name))}"
else:
element = self.toolkit.get_element_by_mapping(category)
if element:
category = f"biolink:{stringcase.pascalcase(stringcase.snakecase(element.name))}"
else:
category = "biolink:OntologyClass"
if not category or category == "biolink:OntologyClass":
prefix = PrefixManager.get_prefix(curie)
# TODO: the mapping should be via biolink-model lookups
if prefix == "HP":
category = "biolink:PhenotypicFeature"
elif prefix == "CHEBI":
category = "biolink:ChemicalSubstance"
elif prefix == "MONDO":
category = "biolink:Disease"
elif prefix == "UBERON":
category = "biolink:AnatomicalEntity"
elif prefix == "SO":
category = "biolink:SequenceFeature"
elif prefix == "CL":
category = "biolink:Cell"
elif prefix == "PR":
category = "biolink:Protein"
elif prefix == "NCBITaxon":
category = "biolink:OrganismalEntity"
else:
log.debug(
f"{curie} Could not find a category mapping for '{category}'; Defaulting to 'biolink:OntologyClass'"
)
return category