Transformer

The Transformer class is responsible for connecting a source to a sink where records are read from the source and written to a sink.

The Transformer supports two modes:

  • No streaming

  • Streaming

No streaming

In this mode, the Transformer reads records from a source and writes to an intermediate graph. One can then use this intermediate graph as a substrate for various graph operations.

from kgx.transformer import Transformer

input_args = {'filename': ['graph_nodes.tsv', 'graph_edges.tsv'], 'format': 'tsv'}
output_args = {'filename': 'graph.json', 'format': 'json'}

t = Transformer(stream=False)

# read from TSV
t.transform(input_args=input_args)

# The intermediate graph store can be accessed via t.store.graph

# write to JSON
t.save(output_args=output_args)

Streaming

In this mode, records are read from a source and written to sink, on-the-fly.

from kgx.transformer import Transformer

input_args = {'filename': ['graph_nodes.tsv', 'graph_edges.tsv'], 'format': 'tsv'}
output_args = {'filename': 'graph.json', 'format': 'json'}

t = Transformer(stream=True)

# read from TSV and write to JSON
t.transform(input_args=input_args, output_args=output_args)

Inspecting the Knowledge Data Flow

Note that transform operation accepts an optional inspect Callable argument which injects node/edge data stream inspection into the Transform.process operation of Transform.transform operations. See the unit test module in the KGX project tests/integration/test_transform.py for an example of usage of this callable argument.

This feature, when coupled with the --stream and a ‘null’ Transformer Sink (i.e. output_args = {'format': 'null'}'), allows “just-in-time” processing of the nodes and edges of huge graphs without incurring a large in-memory footprint.

Provenance of Nodes and Edges

Biolink Model 2.0 specified new properties for edge provenance to replace the (now deprecated) provided_by provenance property (the provided_by property may still be used for node annotation).

One or more of these provenance properties may optionally be inserted as dictionary entries into the input arguments to specify default global values for these properties. Such values will be used when an edge lacks an explicit provenance property. If one does not specify such a global property, then the algorithm heuristically infers and sets a default knowledge_source value.

from kgx.transformer import Transformer

input_args = {
    filename': [
            'graph_nodes.tsv', 
            'graph_edges.tsv'],
    'format': 'tsv',
    'provided_by': "My Test Source",
    'aggregator_knowledge_source': "My Test Source"
    
}

t = Transformer()

# read from TSV 
t.transform(input_args=input_args)

# use the transformed graph
t.store.graph.nodes()
t.store.graph.edges()

InfoRes Identifier Rewriting

The provided_by and/or knowledge_source et al. field values of KGX node and edge records generally contain a name of a knowledge source for the node or edge. In some cases, (e.g. Monarch) such values in source knowledge sources could be quite verbose. To normalize such names to a concise standard, the latest Biolink Model (2.*) is moving towards the use of Information Resource (“InfoRes”) CURIE identifiers.

To help generate and document such InfoRes identifiers, the provenance property values may optionally trigger a rewrite of their knowledge source names to a candidate InfoRes, as follows:

  1. Setting the provenance property to a boolean *True or (case insensitive) string “True” triggers a simple reformatting of knowledge source names into lower case alphanumeric strings removing non-alphanumeric characters and replacing space delimiting words, with hyphens.

  2. Setting the provenance property to a boolean *False or (case insensitive) string “False” suppresses the given provenance annotation on the output graph.

  3. Providing a tuple with a single string argument not equal to True, then the string assumed to be a standard (Pythonic) regular expression to match against knowledge source names. If you do not provide any other string argument (see below), then a matching substring in the name triggers deletion of the matched patter. The simple reformatting (as in 1 above) is then applied to the resulting string.

  4. Similar to 2 above, except providing a second string in the tuple which is substituted for the regular expression matched string, followed by simple reformatting.

  5. Providing a third string in the tuple to add a prefix string to the name (as a separate word) of all the generated InfoRes identifiers. Note that if one sets the first and second elements of the tuple to empty strings, the result is the simple addition of a prefix to the provenance property value. Again, the algorithm then applies the simple reformatting rules, but no other internal changes.

The unit tests provide examples of these various rewrites, in the KGX project tests/integration/test_transform.py.

The catalog of inferred InfoRes mappings onto knowledge source names is available programmatically, after completion of transform call by using the get_infores_catalog() method of the Transformer class. The transform call of the CLI now also takes a multi-valued --knowledge-sources argument, which either facilitates the aforementioned infores processing. Note that quoted comma-delimited strings demarcate the tuple rewrite specifications noted above.

kgx.transformer

class kgx.transformer.Transformer(stream: bool = False, infores_catalog: Optional[str] = None)[source]

Bases: object

The Transformer class is responsible for transforming data from one form to another.

Parameters
  • stream (bool) – Whether or not to stream

  • infores_catalog (Optional[str]) – Optional dump of a TSV file of InfoRes CURIE to Knowledge Source mappings

get_infores_catalog()[source]
Return catalog of Information Resource mappings

aggregated from all Transformer associated sources

get_sink(**kwargs: Dict) → kgx.sink.sink.Sink[source]

Get an instance of Sink that corresponds to a given format.

Parameters

kwargs (Dict) – Arguments required for initializing an instance of Sink

Returns

An instance of kgx.sink.Sink

Return type

Sink

get_source(format: str) → kgx.source.source.Source[source]

Get an instance of Source that corresponds to a given format.

Parameters

format (str) – The input store format

Returns

An instance of kgx.source.Source

Return type

Source

process(source: Generator, sink: kgx.sink.sink.Sink) → None[source]

This method is responsible for reading from source and writing to sink by calling the relevant methods based on the incoming data.

Note

The streamed data must not be mutated.

Parameters
  • source (Generator) – A generator from a Source

  • sink (kgx.sink.sink.Sink) – An instance of Sink

save(output_args: Dict) → None[source]

Save data from the in-memory store to a desired sink.

Parameters

output_args (Dict) – Arguments relevant to your output sink

transform(input_args: Dict, output_args: Optional[Dict] = None, inspector: Optional[Callable[[kgx.utils.kgx_utils.GraphEntityType, List], None]] = None) → None[source]

Transform an input source and write to an output sink.

If output_args is not defined then the data is persisted to an in-memory graph.

The ‘inspector’ argument is an optional Callable which the transformer.process() method applies to ‘inspect’ source records prior to writing them out to the Sink. The first (GraphEntityType) argument of the Callable tags the record as a NODE or an EDGE. The second argument given to the Callable is the current record itself. This Callable is strictly meant to be procedural and should not mutate the record.

Parameters
  • input_args (Dict) – Arguments relevant to your input source

  • output_args (Optional[Dict]) – Arguments relevant to your output sink (

  • inspector (Optional[Callable[[GraphEntityType, List], None]]) – Optional Callable to ‘inspect’ source records during processing.