SimAzz
SimAzz

Reputation: 137

Flink - How to use withTimestampAssigner getting time from Event Payload (without using Kafka timestamps)

I'm trying to understand how to use withTimestampAssigner() inside WatermarkStrategy from Kafka Source. The "time" that I need to be use is inside the message payload.

To do this I have the following code:

FlinkKafkaConsumer<Event> kafkaData =
        new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
kafkaData.assignTimestampsAndWatermarks(
        WatermarkStrategy
        .forMonotonousTimestamps()
                .withTimestampAssigner(Event, Event.time))

DataStream<Event> stream = env.addSource(kafkaData);

Where EventDeserializationSchema() is this:

public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;
    
    private static final CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    private static final ObjectMapper mapper = new CsvMapper();

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return mapper.readerFor(Event.class).with(schema).readValue(message);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {
        
        return TypeInformation.of(Event.class);
    }
}

And Event:

import java.io.Serializable;

public class Event implements Serializable {
    public String firstName;
    public String lastName;
    private int age;
    public String time;

    public Event() {
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }
}

What I'm trying to understand is how provide the time to withTimeStampAssigner():

.withTimestampAssigner(???))

The variable should be Event.time but from flink page i don't quite get it.

enter image description here

I have been searching

enter image description here

And this confused me a bit because i don't understand if on my case the solution is quite straight forward or I need additional context. All the examples that I found are all with .forBoundedOutOfOrderness() or previous versions of flink where the implementation was different like this one:

kafka flink timestamp Event time and watermark

Thanks!

Upvotes: 4

Views: 2693

Answers (1)

David Anderson
David Anderson

Reputation: 43499

If the source (e.g., FlinkKafkaConsumer) isn't providing the timestamps you want to work with, then you need to provide a TimestampAssigner. This is a function that takes an event and the previously assigned timestamp (if any) as input, and returns the timestamp. In your case that can look something like this:

FlinkKafkaConsumer<Event> kafkaData =
        new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);

WatermarkStrategy<Event> wmStrategy = 
        WatermarkStrategy
          .<Event>forMonotonousTimestamps()
          .withTimestampAssigner((event, timestamp) -> event.getTime());

DataStream<Event> stream = env.addSource(
        kafkaData.assignTimestampsAndWatermarks(wmStrategy));

(Note: this won't quite work, since your getTime() method returns a String. You'll need to parse the string and return a long -- typically it will be a long representing milliseconds since the epoch.)

The cases involving a TimestampAssignerSupplier.Context or a WatermarkGeneratorSupplier.Context are for situations where you need access to lower-level APIs to do something more custom. That's not necessary in this case.

Upvotes: 5

Related Questions