Reputation: 235
Trying to implement a Flink job for reading Kafka stream and aggregating the session, for some reason getResult() is not being called. I see createAccumulator() and add() were called, I'm expecting getResult() also be called so that I can sink aggregated message in destination.
source.keyBy(new KeySelector<GenericRecord, String>() {
@Override
public String getKey(GenericRecord record) {
return record.get("id").toString();
}})
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<GenericRecord>() {
private static final long serialVersionUID = -4834111073247835189L;
private final long maxTimeLag = 300000L;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(GenericRecord lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp - maxTimeLag);
}
@Override
public long extractTimestamp(GenericRecord element, long previousElementTimestamp) {
long ts = 1000 * (long)element.get(("timestamp"));
return (ts);
}
})
.map(new ReduceAttributesMap())
.keyBy(new KeySelector<Tuple2<String, String>, String>() {
@Override
public String getKey(Tuple2<String, String> e) {
return e.f0;
}
})
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.aggregate(new EventAggregation())
.addSink(...)
What could be the issue? did I misconfigure something? Appreciate your help!
Upvotes: 2
Views: 1043
Reputation: 3634
AggregateFunction#getResult()
is only called when the window is finalized. In your case, the window is only emitted, when there are no events for a specific key after 5 minutes. Can you confirm in your data that this case is actually happening?
You can try to reduce the gap time of the session window to see it more easily. Furthermore, your watermark assigner looks suspicious. You probably want to use BoundedOutOfOrdernessTimestampExtractor
. Lastly can you double check that your time extraction is working as expected? Is the timestamp stored as seconds since 1970?
Upvotes: 1