Transformer

The Transformer class is responsible for connecting a source to a sink where records are read from the source and written to a sink.

The Transformer supports two modes:

  • No streaming

  • Streaming

No streaming

In this mode, records are read from a source and written to an intermediate graph. This intermediate graph can then be used as a substrate for various graph operations.

from kgx.transformer import Transformer

input_args = {'filename': ['graph_nodes.tsv', 'graph_edges.tsv'], 'format': 'tsv'}
output_args = {'filename': 'graph.json', 'format': 'json'}

t = Transformer(stream=False)

# read from TSV
t.transform(input_args=input_args)

# The intermediate graph store can be accessed via t.store.graph

# write to JSON
t.save(output_args=output_args)

Streaming

In this mode, records are read from a source and written to sink, on-the-fly.

from kgx.transformer import Transformer

input_args = {'filename': ['graph_nodes.tsv', 'graph_edges.tsv'], 'format': 'tsv'}
output_args = {'filename': 'graph.json', 'format': 'json'}

t = Transformer(stream=True)

# read from TSV and write to JSON
t.transform(input_args=input_args, output_args=output_args)

Note that transform operation accepts an optional inspect Callable argument which injects node/edge data stream inspection into the Transform.process operation of Transform.transform operations. See the unit test module test_transformer.py for an example of usage of this callable argument.

This feature, when coupled with the --stream and a ‘null’ Transformer Sink (i.e. output_args = {'format': 'null'}'), allows “just-in-time” processing of the nodes and edges of huge graphs without incurring a large in-memory footprint.

kgx.transformer

class kgx.transformer.Transformer(stream: bool = False)[source]

Bases: object

The Transformer class is responsible for transforming data from one form to another.

Parameters

stream (bool) – Whether or not to stream

get_sink(**kwargs: Dict) → kgx.sink.sink.Sink[source]

Get an instance of Sink that corresponds to a given format.

Parameters

kwargs (Dict) – Arguments required for initializing an instance of Sink

Returns

An instance of kgx.sink.Sink

Return type

Sink

get_source(format: str) → kgx.source.source.Source[source]

Get an instance of Source that corresponds to a given format.

Parameters

format (str) – The input store format

Returns

An instance of kgx.source.Source

Return type

Source

process(source: Generator, sink: kgx.sink.sink.Sink) → None[source]

This method is responsible for reading from source and writing to sink by calling the relevant methods based on the incoming data.

Note

The streamed data must not be mutated.

Parameters
  • source (Generator) – A generator from a Source

  • sink (kgx.sink.sink.Sink) – An instance of Sink

save(output_args: Dict) → None[source]

Save data from the in-memory store to a desired sink.

Parameters

output_args (Dict) – Arguments relevant to your output sink

transform(input_args: Dict, output_args: Optional[Dict] = None, inspector: Optional[Callable[[kgx.GraphEntityType, List], None]] = None) → None[source]

Transform an input source and write to an output sink.

If output_args is not defined then the data is persisted to an in-memory graph.

The ‘inspector’ argument is an optional Callable which the transformer.process() method applies to ‘inspect’ source records prior to writing them out to the Sink. The first (GraphEntityType) argument of the Callable tags the record as a NODE or an EDGE. The second argument given to the Callable is the current record itself. This Callable is strictly meant to be procedural and should not mutate the record.

Parameters
  • input_args (Dict) – Arguments relevant to your input source

  • output_args (Optional[Dict]) – Arguments relevant to your output sink (

  • inspector (Optional[Callable[[GraphEntityType, List], None]]) – Optional Callable to ‘inspect’ source records during processing.