Sparkle8
Sparkle8

Reputation: 235

AggregateFunction getResult() not been called after event aggregation

Trying to implement a Flink job for reading Kafka stream and aggregating the session, for some reason getResult() is not being called. I see createAccumulator() and add() were called, I'm expecting getResult() also be called so that I can sink aggregated message in destination.

        source.keyBy(new KeySelector<GenericRecord, String>() {
                    @Override
                    public String getKey(GenericRecord record) {
                        return record.get("id").toString();
                    }})
                .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<GenericRecord>() {
                    private static final long serialVersionUID = -4834111073247835189L;
                    private final long maxTimeLag = 300000L;

                    @Nullable
                    @Override
                    public Watermark checkAndGetNextWatermark(GenericRecord lastElement, long extractedTimestamp) {
                        return new Watermark(extractedTimestamp - maxTimeLag);
                    }

                    @Override
                    public long extractTimestamp(GenericRecord element, long previousElementTimestamp) {
                        long ts = 1000 * (long)element.get(("timestamp"));
                        return (ts);
                    }
                })
                .map(new ReduceAttributesMap())
                .keyBy(new KeySelector<Tuple2<String, String>, String>() {
                    @Override
                    public String getKey(Tuple2<String, String> e) {
                        return e.f0;
                    }
                })
                .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
                .aggregate(new EventAggregation())
                .addSink(...)

What could be the issue? did I misconfigure something? Appreciate your help!

Upvotes: 2

Views: 1043

Answers (1)

Arvid Heise
Arvid Heise

Reputation: 3634

AggregateFunction#getResult() is only called when the window is finalized. In your case, the window is only emitted, when there are no events for a specific key after 5 minutes. Can you confirm in your data that this case is actually happening?

You can try to reduce the gap time of the session window to see it more easily. Furthermore, your watermark assigner looks suspicious. You probably want to use BoundedOutOfOrdernessTimestampExtractor. Lastly can you double check that your time extraction is working as expected? Is the timestamp stored as seconds since 1970?

Upvotes: 1

Related Questions