Reputation: 1
I have set up a 2 node worker and 1 node manager siddhi cluster. Following is the query I tried pushing into manager. Everything seems to work fine when there is no join in the query, but in case of join as mentioned below query gets deployed in worker node but events dont seem to be satisfied.
@app:name("rule_1")
@source(
type="kafka",
topic.list="test-input-topic",
group.id="test-group",
threading.option="single.thread",
bootstrap.servers="localhost:9092",
@Map(type="json"))
define stream TempStream (deviceID string,roomNo string,temp int );
@sink(
type="kafka",
topic="test-output-topic",
bootstrap.servers="localhost:9092",
@Map(type="json"))
define stream OutStream (message string, message1 string, message2 double);
@info(name = "query1")
@dist(execGroup="group1")
from TempStream[deviceID=="rule_1" and temp>10]#window.time(5 sec)
select avg(temp) as avgTemp, roomNo, deviceID
insert all events into AvgTempStream1;
@info(name = "query2")
@dist(execGroup="group2")
from TempStream[deviceID=="rule_1" and temp<10]#window.time(5 sec)
select avg(temp) as avgTemp, roomNo, deviceID
insert all events into AvgTempStream2;
@info(name = "query3")
@dist(execGroup="group3")
from AvgTempStream1#window.length(1) as stream1 join AvgTempStream2#window.length(1) as stream2
select stream1.deviceID as message,stream1.roomNo as message1, stream1.avgTemp as message2
having stream1.avgTemp>stream2.avgTemp
insert into outputStream;
@info(name = "query4")
@dist(execGroup="group4")
from AvgTempStream1[deviceID=="rule_1"]
select deviceID as message, roomNo as message1, avgTemp as message2
insert into OutStream;
Event being passed
{"event":{"deviceID":"rule_1","roomNo":"123","temp":12}}
Upvotes: 0
Views: 72
Reputation: 890
According to above given Siddhi app you will need to send two input event for it to make an output. One event with temp>10 other temp<10. Ex:
{"event":{"deviceID":"rule_1","roomNo":"123","temp":12}}
{"event":{"deviceID":"rule_1","roomNo":"123","temp":8}}
This will make sure that a join will happen and event will be emitted. For troubleshooting purposes you can subscribe to intermidiatory Kafka topics using Kafka consumer. Names of intermediate topics will follow the format of SiddhiAppName.StreamName. Ex: rule_1.AvgTempStream1
Hope this helps!!
Thanks, Tishan
Upvotes: 1