Reputation: 81
I m trying to implement a CEP Pattern on FLINK on an out of order stream events. My Stream is built in this way:
DataStream<DataInput> input = inputStream.flatMap(
new FlatMapFunction<String, DataInput>() {
@Override
public void flatMap(String value, Collector<DataInput> out) throws Exception {
for(DataInput input : JsonUtilsJackson.getInstance().initTrackingDataFromJson(value)) {
//One input can generate multiple DataInput
out.collect(input);
}
}
})
// Elements can be lately sent
.assignTimestampsAndWatermarks(WatermarkStrategy.Tracking>forBoundedOutOfOrderness(Duration.ofSeconds(10))
//Timestamp is not based on Kinesis but on data timestamp
.withTimestampAssigner((event, timestamp) -> event.getGeneratedDate().toEpochSecond()))
//CEP by KEY
.keyBy(requestId -> requestId.getTrackingData().getEntityReference());
And my pattern is linked to my Stream by the below code:
SingleOutputStreamOperator<DataOutput> enterStream = CEP.pattern(
input,
PatternStrategy.getPattern()
).process(new SpecificProcess());
My understanding of forBoundedOutOfOrderness is that if an element is injected at 11:01:00 with generatedDate field = 10:00:00, it will accept all elements with a generatedDate field between 09:59:50 and 10:00:00 and it will sort in an ascending mode.
The thing I don't understand is how to manage the periodic check of the watermark. Because this one does not depend of my Kinesis timestamp reading (11:01:00 int my exemple), how Flink will trigger the fact that he does not have to wait anymore, is that link to watermark periodic generation + out of orderness?
During my tests, the pattern is launched only one time and never launched after. By debugging I see in CepOperator.onEventTime that events are well buffered but their timestamp is always <= timerService.currentWaterMark().
So, if someone has an explanation, it will help me. Thanks.
By the way, is there a way to have a watermark by KeyedStream, my different entitites has not the same lifetime and I miss some events.
Upvotes: 0
Views: 1174
Reputation: 43499
Your question isn't entirely clear, but perhaps the information below will help you.
That role that watermarks play is that they sit at a particular spot in the stream, and mark that spot with a timestamp that indicates completeness -- at that spot in the stream, no further events are expected with timestamps less than the one in the watermark.
Watermarks don't sort the stream, but they can be used for sorting. This is what CEP does when it is used in event time mode.
forBoundedOutOfOrderness is a watermark strategy that produces watermarks periodically (by default, every 200 msec). But the watermark will only advance if there have been new events since the last watermark that can be used as justification for a larger watermark (i.e., at least one event with a larger timestamp).
Flink does not support per-key watermarking. But the FlinkKinesisConsumer
supports per-shard watermarking, which may help. This will cause the shards with the most lag to hold back the watermarks, and this will avoid there being so many late events. And if you use a separate shard for each key, then you will have something similar to per-kay watermarking.
Upvotes: 1