vvra
vvra

Reputation: 2922

Flink CEP - raise alert if a sequence didnt arrive within a time irrespective of any other message arrived

I wrote a Flink CEP piece that checks for a pattern of status (keyed by an id) with Relaxed Contiguity (followedBy). The idea is to raise an alert if a particular status did not arrive after the first within a specified time.

This works, however if there are nomore messages to this stream the alert is never triggered. But only when a message with some random status arrives this piece gets triggered.

So, how do I make it trigger an alert, even if no message arrives to this stream, when the message with next sequence did not arrive with a time?

Pattern<Transaction, Transaction> pattern = Pattern.<Transaction>begin("start")
            .where(new SimpleCondition<Transaction>() {

                private static final long serialVersionUID = 1L;

                @Override
                public boolean filter(Transaction value) throws Exception {
                    return value.getStatus().equals(Status.STATUS_1);
                }
            })
           .followedBy("end")
           .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction value) throws Exception {
                    return value.getStatus().equals(Status.STATUS_2);
                    return amlveri;
                }
            }).within(Time.seconds(15));

PatternStream<Transaction> patternStream = CEP.pattern(dataStreamSource, pattern);
OutputTag<Alert> timedOutPartialMatchesTag = new OutputTag<Alert>("alert",
            TypeInformation.of(Alert.class)) {};
    SingleOutputStreamOperator<Alert> select = patternStream.flatSelect(timedOutPartialMatchesTag,
            new PatternFlatTimeoutFunction<Transaction, Alert>() {

                @Override
                public void timeout(Map<String, List<Transaction>> values, long arg1, Collector<Alert> arg2)
                        throws Exception {
                    Transaction failedTrans = values.get("start").get(0);
                    arg2.collect(new Alert("status_2 didnt arrive in time, ", failedTrans));
                }
            }, new PatternFlatSelectFunction<Transaction, Alert>() {

                @Override
                public void flatSelect(Map<String, List<Transaction>> arg0, Collector<Alert> arg1)
                        throws Exception {
                       // do not do anything
                }
            });
    select.getSideOutput(timedOutPartialMatchesTag).print();

Upvotes: 0

Views: 197

Answers (1)

David Anderson
David Anderson

Reputation: 43707

If you are working with event time, then the within() method is waiting for a watermark to trigger an event time timer. But with no events arriving, the watermark won't advance (assuming you are using something like the BoundedOutOfOrdernessTimestampExtractor for generating watermarks).

If you need to detect the passage of time while no events are arriving, then it's necessary to use processing time. You could set the TimeCharacteristic to processing time, or you could implement a watermark generator that uses processing time timers to artificially advance the watermark despite the lack of events.

Upvotes: 1

Related Questions