jStaff
jStaff

Reputation: 710

How to persist externally obtained stateful data in apache-beam python?

In my apache-beam job I call an external source, GCP Storage, this can be considered like a http call for universal purposes, the important part is that it is external call to enrich the job.

Every piece of data I am processing, I call this API to obtain some information to enrich the data. There is heavy amounts of repeat calls to the same data on the API.

Is there a good way to cache or store the results for reuse for each piece of data processed to limit the amount of network traffic required. It is a massive bottleneck for processing.

Upvotes: 1

Views: 682

Answers (2)

robertwb
robertwb

Reputation: 5104

You can consider persisting this value as instance state on your DoFn. For example

class MyDoFn(beam.DoFn):
    def __init__(self):
        # This will be called during construction and pickled to the workers.
        self.value1 = some_api_call()

    def setup(self):
        # This will be called once for each DoFn instance (generally
        # once per worker), good for non-pickleable stuff that won't change.
        self.value2 = some_api_call()

    def start_bundle(self):
        # This will be called per-bundle, possibly many times on a worker.
        self.value3 = some_api_call()

    def process(self, element):
        # This is called on each element.
        key = ...
        if key not in self.some_lru_cache:
            self.some_lru_cache[key] = some_api_call()
        value4 = self.some_lru_cache[key]
        # Use self.value1, self.value2, self.value3 and/or value4 here.

Upvotes: 1

Anton
Anton

Reputation: 2539

There is no internal persistence layer in Beam. You have to download the data you want to process. And this can potentially happen on a fleet of workers that all have to have access to the data.

However you might want to consider accessing your data as a side-input. You will have to preload all the data and won't need to query the external source for each element: https://beam.apache.org/documentation/programming-guide/#side-inputs

For GCS specifically you might want to try to use the existing IO, e.g. TextIO: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java

Upvotes: 0

Related Questions