Graham Holker
Graham Holker

Reputation: 3

Windowing appears to work when running on the DirectRunner, but not when running on Cloud Dataflow

I'm trying to break fusion with a GroupByKey. This creates one huge window and since my job is big I'd rather start emitting.

With the direct runner using something like what I found here it seems to work. However, when run on Cloud Dataflow it seems to batch the GBK together and not emit output until the source nodes have "succeeded".

I'm doing a bounded/batch job. I'm extracting the contents of archive files and then writing them to gcs.

Everything works except it takes longer than I expected and cpu utilization is low. I suspect that this is due to fusion -- my hypothesis is that the extraction is fused to the write operation and so there's a pattern of extraction / higher CPU followed by less CPU because we're doing network calls and back again.

The code looks like:

.apply("Window",
  Window.<MyType>into(new GlobalWindows())
    .triggering(
      Repeatedly.forever(                             
        AfterProcessingTime.pastFirstElementInPane()                                
          .plusDelayOf(Duration.standardSeconds(5))))
        .withAllowedLateness(Duration.ZERO)
        .discardingFiredPanes()
)
.apply("Add key", MapElements...)
.apply(GroupByKey.create())

Locally I verify using debug logs so that I can see work is being done after the GBK. The timestamp between the first extraction finishing and the first post-GBK op usually reflects the 5s duration (or other values I change it to (1,5,10,20,30)).

On GCP I verify by looking at the pipeline structure and I can see that everything after the GBK is "not started" and the output collection of the GBK is empty ("-") while the input collection has millions of elements.

Edit:

Upvotes: 0

Views: 572

Answers (1)

chamikara
chamikara

Reputation: 2024

Looks like the answer you referred to was for a streaming pipeline (unbounded input). For batch pipeline processing a bounded input, GroupByKey will not emit till all data for a given key has been processed. Please see here for more details.

Upvotes: 1

Related Questions