user3164398
user3164398

Reputation: 1

Issue related to fully distributed deployment of siddhi query with join operation

I have set up a 2 node worker and 1 node manager siddhi cluster. Following is the query I tried pushing into manager. Everything seems to work fine when there is no join in the query, but in case of join as mentioned below query gets deployed in worker node but events dont seem to be satisfied.

@app:name("rule_1")
@source(
  type="kafka",
  topic.list="test-input-topic",
  group.id="test-group",
  threading.option="single.thread",
  bootstrap.servers="localhost:9092",
  @Map(type="json"))
 define stream TempStream (deviceID string,roomNo string,temp int );

 @sink(
  type="kafka",
  topic="test-output-topic",
  bootstrap.servers="localhost:9092",
  @Map(type="json"))
 define stream OutStream (message string, message1 string, message2 double);

 @info(name = "query1")
 @dist(execGroup="group1")
 from TempStream[deviceID=="rule_1" and temp>10]#window.time(5 sec)
 select avg(temp) as avgTemp, roomNo, deviceID
 insert all events into AvgTempStream1;

 @info(name = "query2")
 @dist(execGroup="group2")
 from TempStream[deviceID=="rule_1" and temp<10]#window.time(5 sec)
 select avg(temp) as avgTemp, roomNo, deviceID
 insert all events into AvgTempStream2;

 @info(name = "query3")
 @dist(execGroup="group3")
 from AvgTempStream1#window.length(1) as stream1 join AvgTempStream2#window.length(1) as stream2
 select stream1.deviceID as message,stream1.roomNo as message1, stream1.avgTemp as message2
 having stream1.avgTemp>stream2.avgTemp
 insert into outputStream;

 @info(name = "query4")
 @dist(execGroup="group4")
 from AvgTempStream1[deviceID=="rule_1"]
 select deviceID as message, roomNo as message1, avgTemp as message2
 insert into OutStream;

Event being passed

{"event":{"deviceID":"rule_1","roomNo":"123","temp":12}}

Upvotes: 0

Views: 72

Answers (1)

Tishan
Tishan

Reputation: 890

According to above given Siddhi app you will need to send two input event for it to make an output. One event with temp>10 other temp<10. Ex:

{"event":{"deviceID":"rule_1","roomNo":"123","temp":12}}

{"event":{"deviceID":"rule_1","roomNo":"123","temp":8}}

This will make sure that a join will happen and event will be emitted. For troubleshooting purposes you can subscribe to intermidiatory Kafka topics using Kafka consumer. Names of intermediate topics will follow the format of SiddhiAppName.StreamName. Ex: rule_1.AvgTempStream1

Hope this helps!!

Thanks, Tishan

Upvotes: 1

Related Questions