Reputation: 137
I'm trying to understand how to use withTimestampAssigner() inside WatermarkStrategy from Kafka Source. The "time" that I need to be use is inside the message payload.
To do this I have the following code:
FlinkKafkaConsumer<Event> kafkaData =
new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
kafkaData.assignTimestampsAndWatermarks(
WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner(Event, Event.time))
DataStream<Event> stream = env.addSource(kafkaData);
Where EventDeserializationSchema() is this:
public class EventDeserializationSchema implements DeserializationSchema<Event> {
private static final long serialVersionUID = 1L;
private static final CsvSchema schema = CsvSchema.builder()
.addColumn("firstName")
.addColumn("lastName")
.addColumn("age", CsvSchema.ColumnType.NUMBER)
.addColumn("time")
.build();
private static final ObjectMapper mapper = new CsvMapper();
@Override
public Event deserialize(byte[] message) throws IOException {
return mapper.readerFor(Event.class).with(schema).readValue(message);
}
@Override
public boolean isEndOfStream(Event nextElement) {
return false;
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
}
And Event:
import java.io.Serializable;
public class Event implements Serializable {
public String firstName;
public String lastName;
private int age;
public String time;
public Event() {
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
}
What I'm trying to understand is how provide the time to withTimeStampAssigner():
.withTimestampAssigner(???))
The variable should be Event.time but from flink page i don't quite get it.
I have been searching
And this confused me a bit because i don't understand if on my case the solution is quite straight forward or I need additional context. All the examples that I found are all with .forBoundedOutOfOrderness() or previous versions of flink where the implementation was different like this one:
kafka flink timestamp Event time and watermark
Thanks!
Upvotes: 4
Views: 2693
Reputation: 43499
If the source (e.g., FlinkKafkaConsumer
) isn't providing the timestamps you want to work with, then you need to provide a TimestampAssigner
. This is a function that takes an event and the previously assigned timestamp (if any) as input, and returns the timestamp. In your case that can look something like this:
FlinkKafkaConsumer<Event> kafkaData =
new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
WatermarkStrategy<Event> wmStrategy =
WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTime());
DataStream<Event> stream = env.addSource(
kafkaData.assignTimestampsAndWatermarks(wmStrategy));
(Note: this won't quite work, since your getTime()
method returns a String. You'll need to parse the string and return a long -- typically it will be a long representing milliseconds since the epoch.)
The cases involving a TimestampAssignerSupplier.Context
or a WatermarkGeneratorSupplier.Context
are for situations where you need access to lower-level APIs to do something more custom. That's not necessary in this case.
Upvotes: 5