Transformer¶
The Transformer class is responsible for connecting a source to a sink where records are read from the source and written to a sink.
The Transformer supports two modes:
No streaming
Streaming
No streaming
In this mode, records are read from a source and written to an intermediate graph. This intermediate graph can then be used as a substrate for various graph operations.
from kgx.transformer import Transformer
input_args = {'filename': ['graph_nodes.tsv', 'graph_edges.tsv'], 'format': 'tsv'}
output_args = {'filename': 'graph.json', 'format': 'json'}
t = Transformer(stream=False)
# read from TSV
t.transform(input_args=input_args)
# The intermediate graph store can be accessed via t.store.graph
# write to JSON
t.save(output_args=output_args)
Streaming
In this mode, records are read from a source and written to sink, on-the-fly.
from kgx.transformer import Transformer
input_args = {'filename': ['graph_nodes.tsv', 'graph_edges.tsv'], 'format': 'tsv'}
output_args = {'filename': 'graph.json', 'format': 'json'}
t = Transformer(stream=True)
# read from TSV and write to JSON
t.transform(input_args=input_args, output_args=output_args)
Note that transform
operation accepts an optional inspect Callable argument which injects node/edge data stream inspection into the Transform.process
operation of Transform.transform
operations. See the unit test module test_transformer.py for an example of usage of this callable argument.
This feature, when coupled with the --stream
and a ‘null’ Transformer Sink (i.e. output_args = {'format': 'null'}'
), allows “just-in-time” processing of the nodes and edges of huge graphs without incurring a large in-memory footprint.
kgx.transformer¶
-
class
kgx.transformer.
Transformer
(stream: bool = False)[source]¶ Bases:
object
The Transformer class is responsible for transforming data from one form to another.
- Parameters
stream (bool) – Whether or not to stream
-
get_sink
(**kwargs: Dict) → kgx.sink.sink.Sink[source]¶ Get an instance of Sink that corresponds to a given format.
- Parameters
kwargs (Dict) – Arguments required for initializing an instance of Sink
- Returns
An instance of kgx.sink.Sink
- Return type
-
get_source
(format: str) → kgx.source.source.Source[source]¶ Get an instance of Source that corresponds to a given format.
- Parameters
format (str) – The input store format
- Returns
An instance of kgx.source.Source
- Return type
-
process
(source: Generator, sink: kgx.sink.sink.Sink) → None[source]¶ This method is responsible for reading from
source
and writing tosink
by calling the relevant methods based on the incoming data.Note
The streamed data must not be mutated.
- Parameters
source (Generator) – A generator from a Source
sink (kgx.sink.sink.Sink) – An instance of Sink
-
save
(output_args: Dict) → None[source]¶ Save data from the in-memory store to a desired sink.
- Parameters
output_args (Dict) – Arguments relevant to your output sink
-
transform
(input_args: Dict, output_args: Optional[Dict] = None, inspector: Optional[Callable[[kgx.GraphEntityType, List], None]] = None) → None[source]¶ Transform an input source and write to an output sink.
If
output_args
is not defined then the data is persisted to an in-memory graph.The ‘inspector’ argument is an optional Callable which the transformer.process() method applies to ‘inspect’ source records prior to writing them out to the Sink. The first (GraphEntityType) argument of the Callable tags the record as a NODE or an EDGE. The second argument given to the Callable is the current record itself. This Callable is strictly meant to be procedural and should not mutate the record.
- Parameters
input_args (Dict) – Arguments relevant to your input source
output_args (Optional[Dict]) – Arguments relevant to your output sink (
inspector (Optional[Callable[[GraphEntityType, List], None]]) – Optional Callable to ‘inspect’ source records during processing.