Jimmy
Jimmy

Reputation: 165

Apache Beam Combine Function not doing anything

I'm trying to use a simple Combine function for the first time applying fixed windows of 10 seconds. Currently I'm just printing out some logging as part of the transforms to see whether something is actually happening but it seems the transforms after the ExtractStreamingMeasures() never actually get called. I'm running the DirectRunner.

Am I missing something?

PipelineOptions options = PipelineOptionsFactory.create();
PubsubOptions dataflowOptions = options.as(PubsubOptions.class);
dataflowOptions.setStreaming(true);

Pipeline p = Pipeline.create(options);

p
            .apply(Window.<Txn>into(FixedWindows.of(Duration.standardSeconds(10))))
            .apply(ParDo.of(new ExtractStreamingMeasures()))
            .apply(Count.<String>perElement())
            .apply(ParDo.of(new DoSomething()));

Transforms:

static class ExtractStreamingMeasures extends DoFn<Txn, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        System.out.println(c.element().getLocationId()); // <= this prints
        c.output(c.element().getLocationId());
    }
}

static class DoSomething extends DoFn<KV<String, Long>, KV<String, Long>> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        System.out.println(c.element()); // <= this doesn't print
        c.output(c.element());
    }
}

Upvotes: 0

Views: 749

Answers (1)

Jimmy
Jimmy

Reputation: 165

Had to provide a different trigger in order for the window to fire properly. The following code will trigger an output every 10 seconds for a window size of 10 minutes.

p.apply("AssignToWindow", Window.<Txn>into(FixedWindows.of(Duration.standardMinutes(10)))
                .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
                .accumulatingFiredPanes()
                .withAllowedLateness(Duration.standardDays(1)))

Upvotes: 1

Related Questions