Reputation: 1305
I am using Apache Beam (SDK 2.40.0) with the GCP Dataflow runner and a streaming pipeline. I need to use a configuration for processing my data that can be altered at any time. Therefore, I'm loading it every 2 minutes (acceptable delay) as a side input like this:
configs = (
p
| PeriodicImpulse(fire_interval=120, apply_windowing=False)
| "Global Window" >> beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5)),
accumulation_mode=trigger.AccumulationMode.DISCARDING
)
| 'Get Side Input' >> beam.ParDo(GetConfigsFn())
)
With an additional print statement I have verified that the configs are successfully loaded every 2 minutes and output into a PCollection.
I use the configs in another step where I process PubSub messages like that (I left out all irrelevant steps, the messages are in a global window as well):
msgs_with_config = (
pubsub_messages
| 'Merge data and configs' >> beam.ParDo(AddConfigFromSideInputFn(), config_dict=beam.pvalue.AsDict(configs))
)
The problem I am facing is, that the merge data and configs step is using older versions of the configs instead of the most recent one. It takes an arbitrary amount of time (from a few minutes, 20 minutes to several hours) until a newer version of the configs is used. My suspision is, that the side input is cached somewhere and is not loaded for every processed message.
Is this a valid explanation for this behaviour and is it expected behaviour? Are there other possible reasons for it?
How can I avoid this behaviour, so that always the most recent side input version is used?
Upvotes: 1
Views: 465
Reputation: 2024
Yes, side inputs are cached in Dataflow workers and used for bundles. If you actually need faster re-loading, I would suggest refactoring the pipeline to perform a join of two windowed PCollection
s instead of using one PCollection as a side input. For example, using the CoGroupByKey transform.
Upvotes: 1