Mazen Ezzeddine
Mazen Ezzeddine

Reputation: 822

kafka flink timestamp Event time and watermark

I am reading the book Stream Processing with Apache Flink and it is stated that “As of version 0.10.0, Kafka supports message timestamps. When reading from Kafka version 0.10 or later, the consumer will automatically extract the message timestamp as an event-time timestamp if the application runs in event-time mode*” So inside a processElement function the call context.timestamp() will by default return the kafka message timestamp? Coul you please provide a simple example on how to implement AssignerWithPeriodicWatermarks/AssignerWithPunctuatedWatermarks that extract (and builds watermarks) based on the consumed kafka message timestamp.

If I am using TimeCharacteristic.ProcessingTime, would ctx.timestamp() return the processing time and in such case would it be similar to context.timerService().currentProcessingTime() .

Thank you.

Upvotes: 2

Views: 4239

Answers (1)

David Anderson
David Anderson

Reputation: 43439

The Flink Kafka consumer takes care of this for you, and puts the timestamp where it needs to be. In Flink 1.11 you can simply rely on this, though you still need to take care of providing a WatermarkStrategy that specifies the out-of-orderness (or asserts that the timestamps are in order):

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.assignTimestampsAndWatermarks(
    WatermarkStrategy.
        .forBoundedOutOfOrderness(Duration.ofSeconds(20)));

In earlier versions of Flink you had to provide an implementation of a timestamp assigner, which would look like this:

public long extractTimestamp(Long element, long previousElementTimestamp) {
    return previousElementTimestamp;
}

This version of the extractTimestamp method is passed the current value of the timestamp present in the StreamRecord as previousElementTimestamp, which in this case will be the timestamp put there by the Flink Kafka consumer.

Flink 1.11 docs
Flink 1.10 docs

As for what is returned by ctx.timestamp() when using TimeCharacteristic.ProcessingTime, this method returns NULL in that case. (Semantically, yes, it is as though the timestamp is the current processing time, but that's not how it's implemented.)

Upvotes: 4

Related Questions