rislah
rislah

Reputation: 197

How does Apache Flink generate watermarks when you ingest data from Apache Kafka?

I can't figure out how exactly is the watermarking supposed to work when you ingest data from Apache Kafka.

  1. I have read that Flink automatically handles watermarking by taking the timestamp from the message, but they don't specify from exactly where. From the message payload, from the headers or from CreateTime?
  2. I have tried extracting the unix timestamp in ms from payload, putting it in header, setting it as the CreateTime and nothing. Watermark doesn't advance and event time window doesn't therefore fire.

Events in the format:

hello,1641369936000
hello,1641369937000
hello,1641369938000
hello,1641369939000
...

Topic kafka-topics --bootstrap-server localhost:9092 --topic testerino --partitions 1 --replication-factor 1 --create

Kafka version 3.0.0, Flink 1.14.2

Thanks in advance

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

KafkaSource<String> stringKafkaSource = KafkaSource.<String>builder()
        .setBootstrapServers("localhost:9092")
        .setGroupId("test-group")
        .setStartingOffsets(OffsetsInitializer.latest())
        .setTopics("testerino")
        .setDeserializer(new KafkaRecordDeserializationSchema<String>() {
            @Override
            public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<String> out) throws IOException {
                System.out.println(record);
                out.collect(new String(record.value()));
            }

            @Override
            public TypeInformation<String> getProducedType() {
                return TypeInformation.of(String.class);
            }
        })
        .build();

DataStreamSource<String> streamSource = env.fromSource(
        stringKafkaSource,
        WatermarkStrategy.<String>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) -> {
            return Long.parseLong(event.split(",")[1]);
        }),
        "source"
);

streamSource
        .keyBy(k -> k.split(",")[0])
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .trigger(new Trigger<String, TimeWindow>() {
            @Override
            public TriggerResult onElement(String element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
                System.out.printf("added elem: %s | timestamp: %s | window: %s| watermark: %d%n",
                        element, timestamp, window, ctx.getCurrentWatermark());
                return TriggerResult.CONTINUE;
            }

            @Override
            public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                System.out.println("proccesing time trigger");
                return TriggerResult.FIRE_AND_PURGE;
            }

            @Override
            public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                System.out.println("event time trigger");
                return TriggerResult.FIRE_AND_PURGE;
            }

            @Override
            public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
                System.out.println("clear");
            }
        })
        .process(new ProcessWindowFunction<String, String, String, TimeWindow>() {
            @Override
            public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                System.out.println("watermark: " + context.currentWatermark());
                out.collect(s);
            }
        });

Upvotes: 0

Views: 880

Answers (1)

David Anderson
David Anderson

Reputation: 43707

Flink never automatically supplies watermarks, but the KafkaSource does take the timestamp from the Kafka headers and uses that to set the timestamp of the StreamRecords it produces. This is the timestamp being passed to your timestamp assigner.

I believe https://stackoverflow.com/a/70101290/2000823 explains why you aren't getting any results.

Upvotes: 2

Related Questions