Sink¶
A Sink can be implemented for any file, local, and/or remote store to which a graph can be written to. A Sink is responsible for writing nodes and edges from a graph.
A Sink must subclass kgx.sink.sink.Sink
class and must implement the following methods:
__init__
write_nodes
write_edges
finalize
__init__
method¶
The __init__
method is used to instantiate a Sink with configurations required for writing to a store.
In the case of files, the
__init__
method will take thefilename
andformat
as argumentsIn the case of a graph store like Neo4j, the
__init__
method will take theuri
,username
, andpassword
as arguments.
The __init__
method also has an optional kwargs
argument which can be used to supply variable number of arguments to this method, depending on the requirements for the store for which the Sink is being implemented.
write_nodes
method¶
Responsible for receiving a node record and writing to a file/store
write_edges
method¶
Responsible for receiving an edge record and writing to a file/store
finalize
method¶
Any operation that needs to be performed after writing all the nodes and edges to a file/store must be defined in this method.
For example,
kgx.source.tsv_source.TsvSource
has afinalize
method that closes the file handles and creates an archive, if compression is desiredkgx.source.neo_sink.NeoSink
has afinalize
method that writes any cached node and edge records
kgx.sink.sink¶
Base class for all Sinks in KGX.
-
class
kgx.sink.sink.
Sink
[source]¶ Bases:
object
A Sink is responsible for writing data as records to a store where the store is a file or a database.
-
finalize
() → None[source]¶ Operations that ought to be done after writing all the incoming data should be called by this method.
-
set_reverse_prefix_map
(m: Dict) → None[source]¶ Update default reverse prefix map.
- Parameters
m (Dict) – A dictionary with IRI to prefix mappings
-
kgx.sink.graph_sink¶
GraphSink
is responsible for writing to an instance of kgx.graph.base_graph.BaseGraph
and must use only
the methods exposed by BaseGraph
to access the graph.
-
class
kgx.sink.graph_sink.
GraphSink
(graph: kgx.graph.base_graph.BaseGraph = None)[source]¶ Bases:
kgx.sink.sink.Sink
GraphSink is responsible for writing data as records to an in memory graph representation.
The underlying store is determined by the graph store class defined in config (
kgx.graph.nx_graph.NxGraph
, by default).- Parameters
graph (kgx.graph.base_graph.BaseGraph) – An instance of BaseGraph to read from
-
set_reverse_prefix_map
(m: Dict) → None¶ Update default reverse prefix map.
- Parameters
m (Dict) – A dictionary with IRI to prefix mappings
kgx.sink.tsv_sink¶
TsvSink
is responsible for writing a KGX formatted CSV or TSV using Pandas.
KGX writes two separate files - one for nodes and another for edges.
-
class
kgx.sink.tsv_sink.
TsvSink
(filename: str, format: str, compression: Optional[str] = None, **kwargs: Any)[source]¶ Bases:
kgx.sink.sink.Sink
TsvSink is responsible for writing data as records to a TSV/CSV.
- Parameters
filename (str) – The filename to write to
format (str) – The file format (
tsv
,csv
)compression (str) – The compression type (
tar
,tar.gz
)kwargs (Any) – Any additional arguments
-
set_edge_properties
(edge_properties: List) → None[source]¶ Update edge properties index with a given list.
- Parameters
edge_properties (List) – A list of edge properties
-
set_node_properties
(node_properties: List) → None[source]¶ Update node properties index with a given list.
- Parameters
node_properties (List) – A list of node properties
-
set_reverse_prefix_map
(m: Dict) → None¶ Update default reverse prefix map.
- Parameters
m (Dict) – A dictionary with IRI to prefix mappings
kgx.sink.json_sink¶
JsonSink
is responsible for writing a KGX formatted JSON using the jsonstreams
library, which allows for streaming records to the file.
-
class
kgx.sink.json_sink.
JsonSink
(filename: str, format: str = 'json', compression: Optional[str] = None, **kwargs: Any)[source]¶ Bases:
kgx.sink.sink.Sink
JsonSink is responsible for writing data as records to a JSON.
- Parameters
filename (str) – The filename to write to
format (str) – The file format (
json
)compression (Optional[str]) – The compression type (
gz
)kwargs (Any) – Any additional arguments
-
set_reverse_prefix_map
(m: Dict) → None¶ Update default reverse prefix map.
- Parameters
m (Dict) – A dictionary with IRI to prefix mappings
kgx.sink.jsonl_sink¶
JsonlSink
is responsible for writing a KGX formatted JSON Lines using the
jsonlines library.
KGX writes two separate JSON Lines files - one for nodes and another for edges.
-
class
kgx.sink.jsonl_sink.
JsonlSink
(filename: str, format: str = 'jsonl', compression: Optional[str] = None, **kwargs: Any)[source]¶ Bases:
kgx.sink.sink.Sink
JsonlSink is responsible for writing data as records to JSON lines.
- Parameters
filename (str) – The filename to write to
format (str) – The file format (
jsonl
)compression (Optional[str]) – The compression type (
gz
)kwargs (Any) – Any additional arguments
-
set_reverse_prefix_map
(m: Dict) → None¶ Update default reverse prefix map.
- Parameters
m (Dict) – A dictionary with IRI to prefix mappings
kgx.sink.trapi_sink¶
TrapiSink
has yet to be implemented.
In principle, TrapiSink
is responsible for writing a Translator Reasoner API
formatted JSON.
-
class
kgx.sink.trapi_sink.
TrapiSink
(filename: str, format: str, compression: Optional[str] = None, **kwargs: Any)[source]¶ Bases:
kgx.sink.tsv_sink.TsvSink
-
finalize
() → None¶ Close file handles and create an archive if compression mode is defined.
-
set_edge_properties
(edge_properties: List) → None¶ Update edge properties index with a given list.
- Parameters
edge_properties (List) – A list of edge properties
-
set_node_properties
(node_properties: List) → None¶ Update node properties index with a given list.
- Parameters
node_properties (List) – A list of node properties
-
set_reverse_prefix_map
(m: Dict) → None¶ Update default reverse prefix map.
- Parameters
m (Dict) – A dictionary with IRI to prefix mappings
-
write_edge
(record: Dict) → None¶ Write an edge record to the underlying store.
- Parameters
record (Dict) – An edge record
-
write_node
(record: Dict) → None¶ Write a node record to the underlying store.
- Parameters
record (Dict) – A node record
-
kgx.sink.neo_sink¶
NeoSink
is responsible for writing data to a local or remote Neo4j instance.
-
class
kgx.sink.neo_sink.
NeoSink
(uri: str, username: str, password: str, **kwargs: Any)[source]¶ Bases:
kgx.sink.sink.Sink
NeoSink is responsible for writing data as records to a Neo4j instance.
- Parameters
uri (str) – The URI for the Neo4j instance. For example, http://localhost:7474
username (str) – The username
password (str) – The password
kwargs (Any) – Any additional arguments
-
static
create_constraint_query
(category: str) → str[source]¶ Create a Cypher CONSTRAINT query
- Parameters
category (str) – The category to create a constraint on
- Returns
The Cypher CONSTRAINT query
- Return type
str
-
create_constraints
(categories: Union[set, list]) → None[source]¶ Create a unique constraint on node ‘id’ for all
categories
in Neo4j.- Parameters
categories (Union[set, list]) – Set of categories
-
static
generate_unwind_edge_query
(edge_predicate: str) → str[source]¶ Generate UNWIND cypher query for saving edges into Neo4j.
Query uses
self.DEFAULT_NODE_CATEGORY
to quickly lookup the required subject and object node.- Parameters
edge_predicate (str) – Edge label as string
- Returns
The UNWIND cypher query
- Return type
str
-
static
generate_unwind_node_query
(category: str) → str[source]¶ Generate UNWIND cypher query for saving nodes into Neo4j.
There should be a CONSTRAINT in Neo4j for
self.DEFAULT_NODE_CATEGORY
. The query usesself.DEFAULT_NODE_CATEGORY
as the node label to increase speed for adding nodes. The query also sets label toself.DEFAULT_NODE_CATEGORY
for any node to make sure that the CONSTRAINT applies.- Parameters
category (str) – Node category
- Returns
The UNWIND cypher query
- Return type
str
-
static
sanitize_category
(category: List) → List[source]¶ Sanitize category for use in UNWIND cypher clause. This method adds escape characters to each element in category list to ensure the category is processed correctly.
- Parameters
category (List) – Category
- Returns
Sanitized category list
- Return type
List
-
set_reverse_prefix_map
(m: Dict) → None¶ Update default reverse prefix map.
- Parameters
m (Dict) – A dictionary with IRI to prefix mappings
kgx.sink.rdf_sink¶
RdfSink
is responsible for writing data as RDF N-Triples.
-
class
kgx.sink.rdf_sink.
RdfSink
(filename: str, format: str = 'nt', compression: Optional[bool] = None, reify_all_edges: bool = False, **kwargs: Any)[source]¶ Bases:
kgx.sink.sink.Sink
RdfSink is responsible for writing data as records to an RDF serialization.
Note
Currently only RDF N-Triples serialization is supported.
- Parameters
filename (str) – The filename to write to
format (str) – The file format (
nt
)compression (str) – The compression type (
gz
)reify_all_edges (bool) – Whether or not to reify all the edges
kwargs (Any) – Any additional arguments
-
get_biolink_element
(predicate: Any) → Optional[linkml_runtime.linkml_model.meta.Element][source]¶ Returns a Biolink Model element for a given predicate.
- Parameters
predicate (Any) – The CURIE of a predicate
- Returns
The corresponding Biolink Model element
- Return type
Optional[Element]
-
process_predicate
(p: Union[rdflib.term.URIRef, str, None]) → Tuple[source]¶ Process a predicate where the method checks if there is a mapping in Biolink Model.
- Parameters
p (Optional[Union[URIRef, str]]) – The predicate
- Returns
A tuple that contains the Biolink CURIE (if available), the Biolink slot_uri CURIE (if available), the CURIE form of p, the reference of p
- Return type
Tuple
-
reify
(u: str, v: str, data: Dict) → Dict[source]¶ Create a node representation of an edge.
- Parameters
u (str) – Subject
v (str) – Object
k (str) – Edge key
data (Dict) – Edge data
- Returns
The reified node
- Return type
Dict
-
set_property_types
(m: Dict) → None[source]¶ Set export type for properties that are not in Biolink Model.
- Parameters
m (Dict) – A dictionary where the keys are property names and values are their corresponding types.
-
set_reverse_predicate_mapping
(m: Dict) → None[source]¶ Set reverse predicate mappings.
Use this method to update mappings for predicates that are not in Biolink Model.
- Parameters
m (Dict) – A dictionary where the keys are property names and values are their corresponding IRI.
-
set_reverse_prefix_map
(m: Dict) → None¶ Update default reverse prefix map.
- Parameters
m (Dict) – A dictionary with IRI to prefix mappings
-
uriref
(identifier: str) → rdflib.term.URIRef[source]¶ Generate a rdflib.URIRef for a given string.
- Parameters
identifier (str) – Identifier as string.
- Returns
URIRef form of the input
identifier
- Return type
rdflib.URIRef