Reputation: 2922
I wrote a Flink CEP piece that checks for a pattern of status (keyed by an id
) with Relaxed Contiguity
(followedBy
). The idea is to raise an alert if a particular status did not arrive after the first within a specified time.
This works, however if there are nomore messages to this stream the alert is never triggered. But only when a message with some random status arrives this piece gets triggered.
So, how do I make it trigger an alert, even if no message arrives to this stream, when the message with next sequence did not arrive with a time?
Pattern<Transaction, Transaction> pattern = Pattern.<Transaction>begin("start")
.where(new SimpleCondition<Transaction>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(Transaction value) throws Exception {
return value.getStatus().equals(Status.STATUS_1);
}
})
.followedBy("end")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction value) throws Exception {
return value.getStatus().equals(Status.STATUS_2);
return amlveri;
}
}).within(Time.seconds(15));
PatternStream<Transaction> patternStream = CEP.pattern(dataStreamSource, pattern);
OutputTag<Alert> timedOutPartialMatchesTag = new OutputTag<Alert>("alert",
TypeInformation.of(Alert.class)) {};
SingleOutputStreamOperator<Alert> select = patternStream.flatSelect(timedOutPartialMatchesTag,
new PatternFlatTimeoutFunction<Transaction, Alert>() {
@Override
public void timeout(Map<String, List<Transaction>> values, long arg1, Collector<Alert> arg2)
throws Exception {
Transaction failedTrans = values.get("start").get(0);
arg2.collect(new Alert("status_2 didnt arrive in time, ", failedTrans));
}
}, new PatternFlatSelectFunction<Transaction, Alert>() {
@Override
public void flatSelect(Map<String, List<Transaction>> arg0, Collector<Alert> arg1)
throws Exception {
// do not do anything
}
});
select.getSideOutput(timedOutPartialMatchesTag).print();
Upvotes: 0
Views: 197
Reputation: 43707
If you are working with event time, then the within()
method is waiting for a watermark to trigger an event time timer. But with no events arriving, the watermark won't advance (assuming you are using something like the BoundedOutOfOrdernessTimestampExtractor for generating watermarks).
If you need to detect the passage of time while no events are arriving, then it's necessary to use processing time. You could set the TimeCharacteristic
to processing time, or you could implement a watermark generator that uses processing time timers to artificially advance the watermark despite the lack of events.
Upvotes: 1