Thomas W.
Thomas W.

Reputation: 516

Slowly Updating Side Inputs & Session Windows - Transform node AppliedPTransform was not replaced as expected

In my apache beam streaming pipeline, I have an unbounded pub/sub source which I use with session windows.

There is some bounded configuration data which I need to pass into some of the DoFns of the pipeline as side input. This data resides in BigQuery. It's slowly changing, I expect a few changes per month at variable points in time. In order to combine the bounded and the unbounded data, I applied this pattern which creates a PeriodicImpulse each hour. Subsequently, the DoFn reads in the config data from BQ, transforms it into a dict and returns it.

Later on, the result of the above mentioned is passed to one of the DoFns of the main pipeline as sideinput.

When executing the pipeline with the LocalRunner, I get a pretty unspecific RuntimeError: Transform node AppliedPTransform(PeriodicImpulse/GenSequence/ProcessKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly) was not replaced as expected.

However, if I replace the PeriodicPulse step by a simple Create(["DummyValue"]), the pipeline is working fine (of course except for the fact that it will ignore all changes to the config data that happened after the initial read from BQ).

What do I need to change in order to get it working?


n = 1
SESSION_GAP_SIZE = 3600 * 24

p_opt = PipelineOptions(
        pipeline_args, streaming=True,  save_main_session=True,allow_unsafe_triggers=True,runner="DirectRunner"
        , ...)


        with Pipeline(options=p_opt) as p:

        cfg_data  = (p
                      | 'PeriodicImpulse' >> PeriodicImpulse(fire_interval=3600,apply_windowing=True)
                      | "Retrieve Segment Config from BQ" >> ParDo(get_segment_config_from_bq)
                     )


        main_p  = (
            p
            | "Read Stream from Pub/Sub" >> io.ReadFromPubSub(subscription=SUBSCRIPTION,with_attributes=True)
            | "Filter 1" >> Filter(Filter1())
            | "Filter 2" >> Filter(Filter2())
            | "Decode Pub/Sub Messages" >> ParDo(ReadPubSubMessage())
            | "Extract Composite Key" >> ParDo(ExtractKey())
            | "Build Session Windows" >> WindowInto(window.Sessions(SESSION_GAP_SIZE ), trigger=AfterCount(n),accumulation_mode=AccumulationMode.ACCUMULATING)
            | "Another GroupByKey" >> GroupByKey()
            | "Enrich Stream Data by Config" >> ParDo(EnrichWithConfig(),segment_cfg=pvalue.AsSingleton(cfg_data))
            | "Output to PubSub" >> WriteToPubSub(topic=TARGET_TOPIC)
        )

Upvotes: 0

Views: 82

Answers (2)

no-stale-reads
no-stale-reads

Reputation: 358

SideInputsseems not to be supported by DirectRunner, see the docs, here

Upvotes: 1

Thomas W.
Thomas W.

Reputation: 516

This issue appears to be a limitation / bug in the DirectRunner. When using Dataflow Runner, it runs just fine. Looks like this is related to this open issue. Unfortunately, no real workaround at this time. Testing locally with DirectRunner is possible by commenting out the PeriodicImpulse step though, so you can at least debug your side input processing pipeline locally.

Upvotes: 1

Related Questions