Reputation: 710
In my apache-beam job I call an external source, GCP Storage, this can be considered like a http call for universal purposes, the important part is that it is external call to enrich the job.
Every piece of data I am processing, I call this API to obtain some information to enrich the data. There is heavy amounts of repeat calls to the same data on the API.
Is there a good way to cache or store the results for reuse for each piece of data processed to limit the amount of network traffic required. It is a massive bottleneck for processing.
Upvotes: 1
Views: 682
Reputation: 5104
You can consider persisting this value as instance state on your DoFn. For example
class MyDoFn(beam.DoFn):
def __init__(self):
# This will be called during construction and pickled to the workers.
self.value1 = some_api_call()
def setup(self):
# This will be called once for each DoFn instance (generally
# once per worker), good for non-pickleable stuff that won't change.
self.value2 = some_api_call()
def start_bundle(self):
# This will be called per-bundle, possibly many times on a worker.
self.value3 = some_api_call()
def process(self, element):
# This is called on each element.
key = ...
if key not in self.some_lru_cache:
self.some_lru_cache[key] = some_api_call()
value4 = self.some_lru_cache[key]
# Use self.value1, self.value2, self.value3 and/or value4 here.
Upvotes: 1
Reputation: 2539
There is no internal persistence layer in Beam. You have to download the data you want to process. And this can potentially happen on a fleet of workers that all have to have access to the data.
However you might want to consider accessing your data as a side-input. You will have to preload all the data and won't need to query the external source for each element: https://beam.apache.org/documentation/programming-guide/#side-inputs
For GCS specifically you might want to try to use the existing IO, e.g. TextIO: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
Upvotes: 0