Reputation: 6342
The following is the definition of WatermarkGenerator
,
@Public
public interface WatermarkGenerator<T> {
/**
* Called for every event, allows the watermark generator to examine and remember the
* event timestamps, or to emit a watermark based on the event itself.
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* Called periodically, and might emit a new watermark, or not.
*
* <p>The interval in which this method is called and Watermarks are generated
* depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*/
void onPeriodicEmit(WatermarkOutput output);
}
For the onEvent
method, why there is an redundant argument long eventTimestamp
in the method signature, I think the event time should be able to be extracted from the T event
(the concrete event type should carry with event time), and it should be equal to eventTimestamp, so I would ask why this redundant agument long eventTimestamp
is needed since I can get it from event,what's design consideration here?
Upvotes: 1
Views: 61
Reputation: 43707
The timestamp passed into the onEvent
method is the current timestamp in the StreamRecord
envelope wrapping your event. This is whatever timestamp has previously been assigned to this event -- e.g., in the case of Kafka this might be the value of the timestamp in the Kafka record header.
While this is typically redundant information, there are situations where it is useful to have access to the timestamp previously assigned upstream.
Upvotes: 1
Reputation: 7228
I think the logic is to have the WatermarkGenerator
not making any assumption regarding where the event time comes from.
A typical case is indeed when as a developer, we provide a TimestampAssigner
a few steps earlier to extract an event time from each event. Even in this case, it is probably desirable not to repeat such logic in the WatermarkGenerator
, to avoid coupling, especially if the process is more involved than just reading a single field. So that's a first motivation for providing it here.
Another typical case is when the event time is provided by the data source itself, independently from any field in each event. For example, one could choose to use the Kafka connector is such a way, letting it obtain an event time from the kafka timestamp metadata, independently of the event payload.
Upvotes: 1