Sink

A Sink can be implemented for any file, local, and/or remote store to which a graph can be written to. A Sink is responsible for writing nodes and edges from a graph.

A Sink must subclass kgx.sink.sink.Sink class and must implement the following methods:

  • __init__

  • write_nodes

  • write_edges

  • finalize

__init__ method

The __init__ method is used to instantiate a Sink with configurations required for writing to a store.

  • In the case of files, the __init__ method will take the filename and format as arguments

  • In the case of a graph store like Neo4j, the __init__ method will take the uri, username, and password as arguments.

The __init__ method also has an optional kwargs argument which can be used to supply variable number of arguments to this method, depending on the requirements for the store for which the Sink is being implemented.

write_nodes method

  • Responsible for receiving a node record and writing to a file/store

write_edges method

  • Responsible for receiving an edge record and writing to a file/store

finalize method

Any operation that needs to be performed after writing all the nodes and edges to a file/store must be defined in this method.

For example,

  • kgx.source.tsv_source.TsvSource has a finalize method that closes the file handles and creates an archive, if compression is desired

  • kgx.source.neo_sink.NeoSink has a finalize method that writes any cached node and edge records

kgx.sink.sink

Base class for all Sinks in KGX.

class kgx.sink.sink.Sink[source]

Bases: object

A Sink is responsible for writing data as records to a store where the store is a file or a database.

finalize() → None[source]

Operations that ought to be done after writing all the incoming data should be called by this method.

set_reverse_prefix_map(m: Dict) → None[source]

Update default reverse prefix map.

Parameters

m (Dict) – A dictionary with IRI to prefix mappings

write_edge(record) → None[source]

Write an edge record to the underlying store.

Parameters

record (Any) – An edge record

write_node(record) → None[source]

Write a node record to the underlying store.

Parameters

record (Any) – A node record

kgx.sink.graph_sink

GraphSink is responsible for writing to an instance of kgx.graph.base_graph.BaseGraph and must use only the methods exposed by BaseGraph to access the graph.

class kgx.sink.graph_sink.GraphSink(graph: kgx.graph.base_graph.BaseGraph = None)[source]

Bases: kgx.sink.sink.Sink

GraphSink is responsible for writing data as records to an in memory graph representation.

The underlying store is determined by the graph store class defined in config (kgx.graph.nx_graph.NxGraph, by default).

Parameters

graph (kgx.graph.base_graph.BaseGraph) – An instance of BaseGraph to read from

finalize() → None[source]

Perform any operations after writing nodes and edges to graph.

set_reverse_prefix_map(m: Dict) → None

Update default reverse prefix map.

Parameters

m (Dict) – A dictionary with IRI to prefix mappings

write_edge(record: Dict) → None[source]

Write an edge record to graph.

Parameters

record (Dict) – An edge record

write_node(record: Dict) → None[source]

Write a node record to graph.

Parameters

record (Dict) – A node record

kgx.sink.tsv_sink

TsvSink is responsible for writing a KGX formatted CSV or TSV using Pandas.

KGX writes two separate files - one for nodes and another for edges.

class kgx.sink.tsv_sink.TsvSink(filename: str, format: str, compression: Optional[str] = None, **kwargs: Any)[source]

Bases: kgx.sink.sink.Sink

TsvSink is responsible for writing data as records to a TSV/CSV.

Parameters
  • filename (str) – The filename to write to

  • format (str) – The file format (tsv, csv)

  • compression (str) – The compression type (tar, tar.gz)

  • kwargs (Any) – Any additional arguments

finalize() → None[source]

Close file handles and create an archive if compression mode is defined.

set_edge_properties(edge_properties: List) → None[source]

Update edge properties index with a given list.

Parameters

edge_properties (List) – A list of edge properties

set_node_properties(node_properties: List) → None[source]

Update node properties index with a given list.

Parameters

node_properties (List) – A list of node properties

set_reverse_prefix_map(m: Dict) → None

Update default reverse prefix map.

Parameters

m (Dict) – A dictionary with IRI to prefix mappings

write_edge(record: Dict) → None[source]

Write an edge record to the underlying store.

Parameters

record (Dict) – An edge record

write_node(record: Dict) → None[source]

Write a node record to the underlying store.

Parameters

record (Dict) – A node record

kgx.sink.json_sink

JsonSink is responsible for writing a KGX formatted JSON using the jsonstreams library, which allows for streaming records to the file.

class kgx.sink.json_sink.JsonSink(filename: str, format: str = 'json', compression: Optional[str] = None, **kwargs: Any)[source]

Bases: kgx.sink.sink.Sink

JsonSink is responsible for writing data as records to a JSON.

Parameters
  • filename (str) – The filename to write to

  • format (str) – The file format (json)

  • compression (Optional[str]) – The compression type (gz)

  • kwargs (Any) – Any additional arguments

finalize() → None[source]

Finalize by creating a compressed file, if needed.

set_reverse_prefix_map(m: Dict) → None

Update default reverse prefix map.

Parameters

m (Dict) – A dictionary with IRI to prefix mappings

write_edge(record: Dict) → None[source]

Write an edge record to JSON.

Parameters

record (Dict) – An edge record

write_node(record: Dict) → None[source]

Write a node record to JSON.

Parameters

record (Dict) – A node record

kgx.sink.jsonl_sink

JsonlSink is responsible for writing a KGX formatted JSON Lines using the jsonlines library.

KGX writes two separate JSON Lines files - one for nodes and another for edges.

class kgx.sink.jsonl_sink.JsonlSink(filename: str, format: str = 'jsonl', compression: Optional[str] = None, **kwargs: Any)[source]

Bases: kgx.sink.sink.Sink

JsonlSink is responsible for writing data as records to JSON lines.

Parameters
  • filename (str) – The filename to write to

  • format (str) – The file format (jsonl)

  • compression (Optional[str]) – The compression type (gz)

  • kwargs (Any) – Any additional arguments

finalize() → None[source]

Perform any operations after writing the file.

set_reverse_prefix_map(m: Dict) → None

Update default reverse prefix map.

Parameters

m (Dict) – A dictionary with IRI to prefix mappings

write_edge(record: Dict) → None[source]

Write an edge record to JSON.

Parameters

record (Dict) – A node record

write_node(record: Dict) → None[source]

Write a node record to JSON.

Parameters

record (Dict) – A node record

kgx.sink.trapi_sink

TrapiSink has yet to be implemented.

In principle, TrapiSink is responsible for writing a Translator Reasoner API formatted JSON.

class kgx.sink.trapi_sink.TrapiSink(filename: str, format: str, compression: Optional[str] = None, **kwargs: Any)[source]

Bases: kgx.sink.tsv_sink.TsvSink

finalize() → None

Close file handles and create an archive if compression mode is defined.

set_edge_properties(edge_properties: List) → None

Update edge properties index with a given list.

Parameters

edge_properties (List) – A list of edge properties

set_node_properties(node_properties: List) → None

Update node properties index with a given list.

Parameters

node_properties (List) – A list of node properties

set_reverse_prefix_map(m: Dict) → None

Update default reverse prefix map.

Parameters

m (Dict) – A dictionary with IRI to prefix mappings

write_edge(record: Dict) → None

Write an edge record to the underlying store.

Parameters

record (Dict) – An edge record

write_node(record: Dict) → None

Write a node record to the underlying store.

Parameters

record (Dict) – A node record

kgx.sink.neo_sink

NeoSink is responsible for writing data to a local or remote Neo4j instance.

class kgx.sink.neo_sink.NeoSink(uri: str, username: str, password: str, **kwargs: Any)[source]

Bases: kgx.sink.sink.Sink

NeoSink is responsible for writing data as records to a Neo4j instance.

Parameters
  • uri (str) – The URI for the Neo4j instance. For example, http://localhost:7474

  • username (str) – The username

  • password (str) – The password

  • kwargs (Any) – Any additional arguments

static create_constraint_query(category: str) → str[source]

Create a Cypher CONSTRAINT query

Parameters

category (str) – The category to create a constraint on

Returns

The Cypher CONSTRAINT query

Return type

str

create_constraints(categories: Union[set, list]) → None[source]

Create a unique constraint on node ‘id’ for all categories in Neo4j.

Parameters

categories (Union[set, list]) – Set of categories

finalize() → None[source]

Write any remaining cached node and/or edge records.

static generate_unwind_edge_query(edge_predicate: str) → str[source]

Generate UNWIND cypher query for saving edges into Neo4j.

Query uses self.DEFAULT_NODE_CATEGORY to quickly lookup the required subject and object node.

Parameters

edge_predicate (str) – Edge label as string

Returns

The UNWIND cypher query

Return type

str

static generate_unwind_node_query(category: str) → str[source]

Generate UNWIND cypher query for saving nodes into Neo4j.

There should be a CONSTRAINT in Neo4j for self.DEFAULT_NODE_CATEGORY. The query uses self.DEFAULT_NODE_CATEGORY as the node label to increase speed for adding nodes. The query also sets label to self.DEFAULT_NODE_CATEGORY for any node to make sure that the CONSTRAINT applies.

Parameters

category (str) – Node category

Returns

The UNWIND cypher query

Return type

str

static sanitize_category(category: List) → List[source]

Sanitize category for use in UNWIND cypher clause. This method adds escape characters to each element in category list to ensure the category is processed correctly.

Parameters

category (List) – Category

Returns

Sanitized category list

Return type

List

set_reverse_prefix_map(m: Dict) → None

Update default reverse prefix map.

Parameters

m (Dict) – A dictionary with IRI to prefix mappings

write_edge(record) → None[source]

Cache an edge record that is to be written to Neo4j. This method writes a cache of edge records when the total number of records exceeds CACHE_SIZE

Parameters

record (Dict) – An edge record

write_node(record) → None[source]

Cache a node record that is to be written to Neo4j. This method writes a cache of node records when the total number of records exceeds CACHE_SIZE

Parameters

record (Dict) – A node record

kgx.sink.rdf_sink

RdfSink is responsible for writing data as RDF N-Triples.

class kgx.sink.rdf_sink.RdfSink(filename: str, format: str = 'nt', compression: Optional[bool] = None, reify_all_edges: bool = False, **kwargs: Any)[source]

Bases: kgx.sink.sink.Sink

RdfSink is responsible for writing data as records to an RDF serialization.

Note

Currently only RDF N-Triples serialization is supported.

Parameters
  • filename (str) – The filename to write to

  • format (str) – The file format (nt)

  • compression (str) – The compression type (gz)

  • reify_all_edges (bool) – Whether or not to reify all the edges

  • kwargs (Any) – Any additional arguments

finalize() → None[source]

Perform any operations after writing the file.

Returns a Biolink Model element for a given predicate.

Parameters

predicate (Any) – The CURIE of a predicate

Returns

The corresponding Biolink Model element

Return type

Optional[Element]

process_predicate(p: Union[rdflib.term.URIRef, str, None]) → Tuple[source]

Process a predicate where the method checks if there is a mapping in Biolink Model.

Parameters

p (Optional[Union[URIRef, str]]) – The predicate

Returns

A tuple that contains the Biolink CURIE (if available), the Biolink slot_uri CURIE (if available), the CURIE form of p, the reference of p

Return type

Tuple

reify(u: str, v: str, data: Dict) → Dict[source]

Create a node representation of an edge.

Parameters
  • u (str) – Subject

  • v (str) – Object

  • k (str) – Edge key

  • data (Dict) – Edge data

Returns

The reified node

Return type

Dict

set_property_types(m: Dict) → None[source]

Set export type for properties that are not in Biolink Model.

Parameters

m (Dict) – A dictionary where the keys are property names and values are their corresponding types.

set_reverse_predicate_mapping(m: Dict) → None[source]

Set reverse predicate mappings.

Use this method to update mappings for predicates that are not in Biolink Model.

Parameters

m (Dict) – A dictionary where the keys are property names and values are their corresponding IRI.

set_reverse_prefix_map(m: Dict) → None

Update default reverse prefix map.

Parameters

m (Dict) – A dictionary with IRI to prefix mappings

uriref(identifier: str) → rdflib.term.URIRef[source]

Generate a rdflib.URIRef for a given string.

Parameters

identifier (str) – Identifier as string.

Returns

URIRef form of the input identifier

Return type

rdflib.URIRef

write_edge(record: Dict) → None[source]

Write an edge record as triples.

Parameters

record (Dict) – An edge record

write_node(record: Dict) → None[source]

Write a node record as triples.

Parameters

record (Dict) – A node record