Source code for kgx.source.owl_source

from typing import Set, Optional, Generator, Any, Dict

import rdflib
from rdflib import Namespace, URIRef, OWL, RDFS, RDF

from kgx.config import get_logger
from kgx.source import RdfSource
from kgx.utils.kgx_utils import (
    current_time_in_millis,
    generate_uuid,
    generate_edge_identifiers,
    validate_node,
    sanitize_import,
    validate_edge,
)

log = get_logger()


[docs]class OwlSource(RdfSource): """ OwlSource is responsible for parsing an OWL ontology. ..note:: This is a simple parser that loads direct class-class relationships. For more formal OWL parsing, refer to Robot: http://robot.obolibrary.org/ """ def __init__(self): self.imported: Set = set() super().__init__() self.OWLSTAR = Namespace("http://w3id.org/owlstar/") self.excluded_predicates = { URIRef("https://raw.githubusercontent.com/geneontology/go-ontology/master/contrib/oboInOwl#id") }
[docs] def parse( self, filename: str, format: str = "owl", compression: Optional[str] = None, **kwargs: Any, ) -> Generator: """ This method reads from an OWL and yields records. Parameters ---------- filename: str The filename to parse format: str The format (``owl``) compression: Optional[str] The compression type (``gz``) kwargs: Any Any additional arguments Returns ------- Generator A generator for node and edge records read from the file """ rdfgraph = rdflib.Graph() if compression: log.warning(f"compression mode '{compression}' not supported by OwlSource") if format is None: format = rdflib.util.guess_format(filename) if format == "owl": format = "xml" log.info("Parsing {} with '{}' format".format(filename, format)) rdfgraph.parse(filename, format=format) log.info("{} parsed with {} triples".format(filename, len(rdfgraph))) self.set_provenance_map(kwargs) self.start = current_time_in_millis() log.info(f"Done parsing {filename}") triples = rdfgraph.triples((None, OWL.imports, None)) for s, p, o in triples: # Load all imports first if p == OWL.imports: if o not in self.imported: input_format = rdflib.util.guess_format(o) imported_rdfgraph = rdflib.Graph() log.info(f"Parsing OWL import: {o}") self.imported.add(o) imported_rdfgraph.parse(o, format=input_format) self.load_graph(imported_rdfgraph) else: log.warning(f"Trying to import {o} but its already done") yield from self.load_graph(rdfgraph)
[docs] def load_graph(self, rdfgraph: rdflib.Graph, **kwargs: Any) -> None: """ Walk through the rdflib.Graph and load all triples into kgx.graph.base_graph.BaseGraph Parameters ---------- rdfgraph: rdflib.Graph Graph containing nodes and edges kwargs: Any Any additional arguments """ seen = set() seen.add(RDFS.subClassOf) for s, p, o in rdfgraph.triples((None, RDFS.subClassOf, None)): # ignoring blank nodes if isinstance(s, rdflib.term.BNode): continue pred = None parent = None os_interpretation = None if isinstance(o, rdflib.term.BNode): # C SubClassOf R some D for x in rdfgraph.objects(o, OWL.onProperty): pred = x # owl:someValuesFrom for x in rdfgraph.objects(o, OWL.someValuesFrom): os_interpretation = self.OWLSTAR.term("AllSomeInterpretation") parent = x # owl:allValuesFrom for x in rdfgraph.objects(o, OWL.allValuesFrom): os_interpretation = self.OWLSTAR.term("AllOnlyInterpretation") parent = x if pred is None or parent is None: log.warning( f"{s} {p} {o} has OWL.onProperty {pred} and OWL.someValuesFrom {parent}" ) log.warning("Do not know how to handle BNode: {}".format(o)) continue else: # C rdfs:subClassOf D (where C and D are named classes) pred = p parent = o if os_interpretation: # reify edges that have logical interpretation eid = generate_uuid() self.reified_nodes.add(eid) yield from self.triple( URIRef(eid), self.BIOLINK.term("category"), self.BIOLINK.Association ) yield from self.triple(URIRef(eid), self.BIOLINK.term("subject"), s) yield from self.triple( URIRef(eid), self.BIOLINK.term("predicate"), pred ) yield from self.triple(URIRef(eid), self.BIOLINK.term("object"), parent) yield from self.triple( URIRef(eid), self.BIOLINK.term("logical_interpretation"), os_interpretation, ) else: yield from self.triple(s, pred, parent) seen.add(OWL.equivalentClass) for s, p, o in rdfgraph.triples((None, OWL.equivalentClass, None)): # A owl:equivalentClass B (where A and B are named classes) if not isinstance(o, rdflib.term.BNode): yield from self.triple(s, p, o) for relation in rdfgraph.subjects(RDF.type, OWL.ObjectProperty): seen.add(relation) for s, p, o in rdfgraph.triples((relation, None, None)): if not isinstance(o, rdflib.term.BNode): if p not in self.excluded_predicates: yield from self.triple(s, p, o) for s, p, o in rdfgraph.triples((None, None, None)): if isinstance(s, rdflib.term.BNode) or isinstance(o, rdflib.term.BNode): continue if p in seen: continue if p in self.excluded_predicates: continue yield from self.triple(s, p, o) for n in self.reified_nodes: data = self.node_cache.pop(n) self.dereify(n, data) for k, data in self.node_cache.items(): node_data = validate_node(data) node_data = sanitize_import(node_data) self.set_node_provenance(node_data) if self.check_node_filter(node_data): self.node_properties.update(node_data.keys()) yield k, node_data self.node_cache.clear() for k, data in self.edge_cache.items(): edge_data = validate_edge(data) edge_data = sanitize_import(edge_data) self.set_edge_provenance(edge_data) if self.check_edge_filter(edge_data): self.edge_properties.update(edge_data.keys()) yield k[0], k[1], k[2], edge_data self.edge_cache.clear()