dnnshssm
dnnshssm

Reputation: 1305

Apache Beam/Dataflow: pipeline doesn't use most recent version of side input (streaming pipeline with global window and frequently updated side input)

I am using Apache Beam (SDK 2.40.0) with the GCP Dataflow runner and a streaming pipeline. I need to use a configuration for processing my data that can be altered at any time. Therefore, I'm loading it every 2 minutes (acceptable delay) as a side input like this:

configs = (
        p
        | PeriodicImpulse(fire_interval=120, apply_windowing=False)
        | "Global Window" >> beam.WindowInto(
                window.GlobalWindows(),
                trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5)),
                accumulation_mode=trigger.AccumulationMode.DISCARDING
            )
        | 'Get Side Input' >> beam.ParDo(GetConfigsFn())
)

With an additional print statement I have verified that the configs are successfully loaded every 2 minutes and output into a PCollection.

I use the configs in another step where I process PubSub messages like that (I left out all irrelevant steps, the messages are in a global window as well):

msgs_with_config = (
        pubsub_messages
        | 'Merge data and configs' >> beam.ParDo(AddConfigFromSideInputFn(), config_dict=beam.pvalue.AsDict(configs))
)

The problem I am facing is, that the merge data and configs step is using older versions of the configs instead of the most recent one. It takes an arbitrary amount of time (from a few minutes, 20 minutes to several hours) until a newer version of the configs is used. My suspision is, that the side input is cached somewhere and is not loaded for every processed message.

Is this a valid explanation for this behaviour and is it expected behaviour? Are there other possible reasons for it?

How can I avoid this behaviour, so that always the most recent side input version is used?

Upvotes: 1

Views: 465

Answers (1)

chamikara
chamikara

Reputation: 2024

Yes, side inputs are cached in Dataflow workers and used for bundles. If you actually need faster re-loading, I would suggest refactoring the pipeline to perform a join of two windowed PCollections instead of using one PCollection as a side input. For example, using the CoGroupByKey transform.

Upvotes: 1

Related Questions