Prashant Sahoo
Prashant Sahoo

Reputation: 1085

Generate CEP event if found same data 5 times within 5 second in Apache flink

I have a requirement to generate CEP event if we found string message starts with character 'a' continuously 5 times within 5 second.

For that i have written a class CEPCharEventPublisher.java which will publish the string message (as below Published message) to kafka topic 'charEvent'

Published message:

b; date- 2019-06-27 09:05:09.605
a; date- 2019-06-27 09:05:10.160
c; date- 2019-06-27 09:05:10.661
b; date- 2019-06-27 09:05:11.162
c; date- 2019-06-27 09:05:11.669
b; date- 2019-06-27 09:05:12.175
b; date- 2019-06-27 09:05:12.675
b; date- 2019-06-27 09:05:13.176
a; date- 2019-06-27 09:05:13.676
c; date- 2019-06-27 09:05:14.176
b; date- 2019-06-27 09:05:14.677
b; date- 2019-06-27 09:05:15.177
b; date- 2019-06-27 09:05:15.678
c; date- 2019-06-27 09:05:16.178
a; date- 2019-06-27 09:05:16.679
c; date- 2019-06-27 09:05:17.179
c; date- 2019-06-27 09:05:17.680
c; date- 2019-06-27 09:05:18.180
c; date- 2019-06-27 09:05:18.681
c; date- 2019-06-27 09:05:19.181
c; date- 2019-06-27 09:05:19.681
a; date- 2019-06-27 09:05:20.182
c; date- 2019-06-27 09:05:20.682
b; date- 2019-06-27 09:05:21.182
c; date- 2019-06-27 09:05:21.682
b; date- 2019-06-27 09:05:22.183
a; date- 2019-06-27 09:05:22.683
b; date- 2019-06-27 09:05:23.184
a; date- 2019-06-27 09:05:23.684
c; date- 2019-06-27 09:05:24.184
b; date- 2019-06-27 09:05:24.685
b; date- 2019-06-27 09:05:25.186
c; date- 2019-06-27 09:05:25.687
b; date- 2019-06-27 09:05:26.187
a; date- 2019-06-27 09:05:26.687
a; date- 2019-06-27 09:05:27.188
a; date- 2019-06-27 09:05:27.688
b; date- 2019-06-27 09:05:28.188
b; date- 2019-06-27 09:05:28.688

Now I have a consumer CEPCharEventConsumer.java which will read message from Kafka topic charEvent and filter the message which starts with character 'a'.

Then I have written following pattern to generate CEP event/alert while we found the continuously 5 message, that starts with character 'a' within 5 second.

Pattern<String, String> pattern = Pattern.<String> begin("start")
                .times(5).greedy().where(new SimpleCondition<String>() {
                    private static final long serialVersionUID = -6301755149429716724L;

                    @Override
                    public boolean filter(String value) throws Exception {
                        return value.split(";")[0].equals("a");
                    }
                }).within(Time.seconds(5));

Printing the message that starts with character 'a' received by CEPCharEventConsumer.java in below.

2> a; date- 2019-06-27 09:05:10.160
1> a; date- 2019-06-27 09:05:13.676
3> a; date- 2019-06-27 09:05:16.679
2> a; date- 2019-06-27 09:05:20.182
3> a; date- 2019-06-27 09:05:22.683
1> a; date- 2019-06-27 09:05:23.684
3> a; date- 2019-06-27 09:05:26.687
1> a; date- 2019-06-27 09:05:27.188
1> a; date- 2019-06-27 09:05:27.688
1> a; date- 2019-06-27 09:05:29.198
2> a; date- 2019-06-27 09:05:30.199

1> a; date- 2019-06-27 09:05:33.703
1> a; date- 2019-06-27 09:05:35.203
3> a; date- 2019-06-27 09:05:36.705
2> a; date- 2019-06-27 09:05:38.207
1> a; date- 2019-06-27 09:05:39.709
2> a; date- 2019-06-27 09:05:40.209
3> a; date- 2019-06-27 09:05:40.728

Printed alert message:

4> Found: a; date- 2019-06-27 09:05:26.687

In the above message "Found: a; date- 2019-06-27 09:05:26.687" is the alert message.

I could not understand how flink calculate the continuous five message within 5 second. I think something wrong is there.

I am attaching the GIT URL of source code (flink-cep-char-event). can anyone please make it correct as per my requirement.

Upvotes: 0

Views: 144

Answers (1)

David Anderson
David Anderson

Reputation: 43409

Your CEP-based application appears to be correctly reporting that these 5 messages

3> a; date- 2019-06-27 09:05:26.687
1> a; date- 2019-06-27 09:05:27.188
1> a; date- 2019-06-27 09:05:27.688
1> a; date- 2019-06-27 09:05:29.198
2> a; date- 2019-06-27 09:05:30.199

occurred within a 5 second interval.

The processMatch method in your PatternProcessFunction is passed a Map<String, List<String>> match. In your case match.get("start") returns a list of the 5 matching events in the "start" clause of your pattern (which is the entire pattern).

So to produce a report that gives the time of the last matching event, rather than the first one, change

String start = match.get("start").get(0);
out.collect("Found: " + start);

to

String last = match.get("start").get(4);
out.collect("Found: " + last);

Upvotes: 1

Related Questions