Reputation: 197
I can't figure out how exactly is the watermarking supposed to work when you ingest data from Apache Kafka.
Events in the format:
hello,1641369936000
hello,1641369937000
hello,1641369938000
hello,1641369939000
...
Topic kafka-topics --bootstrap-server localhost:9092 --topic testerino --partitions 1 --replication-factor 1 --create
Kafka version 3.0.0, Flink 1.14.2
Thanks in advance
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
KafkaSource<String> stringKafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setGroupId("test-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setTopics("testerino")
.setDeserializer(new KafkaRecordDeserializationSchema<String>() {
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<String> out) throws IOException {
System.out.println(record);
out.collect(new String(record.value()));
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
})
.build();
DataStreamSource<String> streamSource = env.fromSource(
stringKafkaSource,
WatermarkStrategy.<String>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) -> {
return Long.parseLong(event.split(",")[1]);
}),
"source"
);
streamSource
.keyBy(k -> k.split(",")[0])
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.trigger(new Trigger<String, TimeWindow>() {
@Override
public TriggerResult onElement(String element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
System.out.printf("added elem: %s | timestamp: %s | window: %s| watermark: %d%n",
element, timestamp, window, ctx.getCurrentWatermark());
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
System.out.println("proccesing time trigger");
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
System.out.println("event time trigger");
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
System.out.println("clear");
}
})
.process(new ProcessWindowFunction<String, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
System.out.println("watermark: " + context.currentWatermark());
out.collect(s);
}
});
Upvotes: 0
Views: 880
Reputation: 43707
Flink never automatically supplies watermarks, but the KafkaSource does take the timestamp from the Kafka headers and uses that to set the timestamp of the StreamRecords it produces. This is the timestamp being passed to your timestamp assigner.
I believe https://stackoverflow.com/a/70101290/2000823 explains why you aren't getting any results.
Upvotes: 2