Source code for kgx.source.trapi_source

import gzip
import ijson
from itertools import chain
from typing import Dict, Tuple, Generator, Optional, Any

from kgx.source.json_source import JsonSource


# TODO: update for TRAPI 1.0 spec


[docs]class TrapiSource(JsonSource): """ TrapiSource is responsible for reading data as records from a TRAPI JSON. """ def __init__(self): super().__init__() self._node_properties = set() self._edge_properties = set()
[docs] def parse( self, filename: str, format: str = "json", compression: Optional[str] = None, **kwargs: Any ) -> Generator: """ This method reads from a JSON and yields records. Parameters ---------- filename: str The filename to parse format: str The format (``trapi-json``) compression: Optional[str] The compression type (``gz``) kwargs: Any Any additional arguments Returns ------- Generator A generator for node and edge records """ self.set_provenance_map(kwargs) n = self.read_nodes(filename, compression) e = self.read_edges(filename, compression) yield from chain(n, e)
[docs] def read_nodes(self, filename: str, compression: Optional[str] = None) -> Generator: """ Read node records from a JSON. Parameters ---------- filename: str The filename to read from compression: Optional[str] The compression type Returns ------- Generator A generator for node records """ if compression == "gz": FH = gzip.open(filename, "rb") else: FH = open(filename, "rb") for n in ijson.items(FH, "knowledge_graph.nodes.item"): yield self.load_node(n)
[docs] def read_edges(self, filename: str, compression: Optional[str] = None) -> Generator: """ Read edge records from a JSON. Parameters ---------- filename: str The filename to read from compression: Optional[str] The compression type Returns ------- Generator A generator for edge records """ if compression == "gz": FH = gzip.open(filename, "rb") else: FH = open(filename, "rb") for e in ijson.items(FH, "knowledge_graph.edges.item"): yield self.load_edge(e)
[docs] def load_node(self, node: Dict) -> Tuple[str, Dict]: """ Load a node into an instance of BaseGraph .. Note:: This method transformers Reasoner Std API format fields to Biolink Model fields. Parameters ---------- node : Dict A node """ if "type" in node and "category" not in node: node["category"] = node["type"] del node["type"] return super().read_node(node)
[docs] def load_edge(self, edge: Dict) -> Tuple[str, str, str, Dict]: """ Load an edge into an instance of BaseGraph .. Note:: This methods transformers Reasoner Std API format fields to Biolink Model fields. Parameters ---------- edge : Dict An edge """ if "source_id" in edge: edge["subject"] = edge["source_id"] if "target_id" in edge: edge["object"] = edge["target_id"] if "relation_label" in edge: edge["predicate"] = edge["relation_label"][0] return super().read_edge(edge)