Kannan
Kannan

Reputation: 5

WSO2 CEP : Siddhi QL: How to alert events consistently after matching certain condition

From the below stream, I want to alert the event which has occurred twice when the temperature goes beyond 90 (like every 2 event with temp > 90 needs to be alerted).

InputStream=[1001,91]
InputStream=[1001,86]
InputStream=[1002,70]
InputStream=[1001,85]
InputStream=[1003,70]
InputStream=[1003,85]
InputStream=[1002,70]
InputStream=[1003,70]
InputStream=[1003,87]
InputStream=[1002,70]
InputStream=[1001,95]
InputStream=[1001,96]
InputStream=[1001,97]
InputStream=[1001,98]
InputStream=[1001,98]

I have written something like this:

@Plan:name('TestExecutionPlan')

define stream InputStream (id string, temp int);

partition with (id of InputStream)
begin
from InputStream
select id, temp
having temp > 90  
insert into CriticalStream
end;

from CriticalStream[count(id) == 2]
select id, temp
group by id
--having count(id) == 2
insert into EventReporter;

However its alerting only 1 event in the EventReporter stream.

Below is the screen shot from Try It

I am expecting the EventReporter stream to have [1001,97] and [1001,98] as well, right now it has only the record for [1001,95]. Could someone please point out what I am doing wrong here. How I can loop through the events after grouping it? I tried adding window.time and window.length, but not getting the desired output. Any help / guidance would be really appreciated. Thank you.

Upvotes: 0

Views: 288

Answers (1)

Grainier
Grainier

Reputation: 1654

You won't be requiring a partition there. You can simply use a filter and a lengthBatch window to get your desired output. Try below execution plan;

@Plan:name('ExecutionPlan')

@Import('InputStream:1.0.0')
define stream InputStream (id string, temp int);

/* Filter events with temp > 90 */
from InputStream[temp > 90]
insert into CriticalStream;

/* Aggregate within a lengthBatch window, while group by id*/
from CriticalStream#window.lengthBatch(2)
select id, temp, count() as count
group by id
insert into EventReporter;

/* Just for logging the result in the cosole */
from EventReporter#log("Logging EventReporter : ")
insert into #temp;

Upvotes: 1

Related Questions