Reputation: 217
The problem i am facing is that I am unable to perform a sum on a single CEP pattern in scala. I want to detect when sum is greater than 6100 for a particular customerID. I am providing a keyed stream to the CEP.pattern(...). I have provided my code below for pattern construction.
val pattern1 =Pattern.begin[GenericRecord]("start").where((v,ctx) => {
lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum
print((sum + v.get("amount").toString.toLong).toString)
//print(sum+v.get("amount").toString.toLong>6100)
//println(v.get("customer_id").toString+v.get("amount").toString+" , ")
(sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
}).oneOrMore
My input is in avro format and Flink is consuming it from kafka. The input looks like this -:
{"trasanction_id":196,"customer_id":28,"datetime":"2017-09-01 12:35:08","amount":6094,"state":"FAILED"}
{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}
{"trasanction_id":199,"customer_id":27,"datetime":"2017-09-01 12:36:05","amount":2399,"state":"FAILED"}
{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}```
However, below code runs well while using two patterns-:
val pattern1=Pattern.begin[GenericRecord]("start").followedBy("middle").where((v,ctx) => {
lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum
print((sum + v.get("amount").toString.toLong).toString)
//print(sum+v.get("amount").toString.toLong>6100)
//println(v.get("customer_id").toString+v.get("amount").toString+" , ")
(sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
}).oneOrMore
Upvotes: 0
Views: 432
Reputation: 5073
getEventsForPattern
returns values already matched by the pattern. Let's analyze customer 27
. When processing event
{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}
your first snippet rejects this message as it is not satisfying condition: sum + amount = 0 + 6094 < 6100
. When processing
{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}
your condition will once again be checking if 0 + 547 > 6100
and this is why you see no output.
In your second example you are using followedBy
operator which mean that you are going to process pairs of elements. First transaction is accepted unconditionally (as you are not including where
operator) and now it is going to be returned by ctx.getEventsForPattern("start")
call. I hope you know understand behaviour of this code.
CEP
is mostly used to spot patterns in data, not to aggregate them. Your problem can be approached by doing windowing followed by filtering - no need to use CEP
here.
Upvotes: 3