Reputation: 3
I am facing a non-expected behaviour when using the clause output every
along with table join
clause.
input
stream, and 2 tables, which store a different list of values. Then, there are also 2 queries,query1
will join with table1
, and when there is a match will output first every 5 sec
.query2
will do similarly, will join table2
, and will output first value found every 5 sec.input
stream which is contained into table 1, there will be a match, and if there is a value contained into table 2, there will be a different match, and both queries will keep silent until next 5 seconds block.the app is the following
@App:name("delays_tables_join")
define stream input(value string);
define stream table_input(value string);
define table table1(value string);
define table table2(value string);
@sink(type='log')
define stream LogStream (value string);
-- fill table1
@info(name='insert table 1')
from table_input[value == '1']
insert into table1;
-- fill table2
@info(name='insert table 2')
from table_input[value == '2']
insert into table2;
-- query input join with table 1, output once every 5 sec
@info(name='query1')
from input join table1 on input.value == table1.value
select input.value
output first every 5 sec
insert into LogStream;
-- query input join with table 2, output once every 5 sec
@info(name='query2')
from input join table2 on input.value == table2.value
select input.value
output first every 5 sec
insert into LogStream;
When this app is run,first its sent to table_input
the values 1
, and 2
to fill both tables
And then, it starts sending to the input stream repeatedly values: 1
, 2
, 1
, 2
, 1
, 2
...
It is expected to have in LogStream
2 values every 5 seconds, the first appearance of 1
value, and the first appearance of value 2
.
But instead, just the first occurrence of value 1
appears all the time, but not the value 2
[2020-04-02_18-55-16_498] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays_tables_join : LogStream : Event{timestamp=1585846516098, data=[1], isExpired=false}
[2020-04-02_18-55-21_508] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays_tables_join : LogStream : Event{timestamp=1585846521098, data=[1], isExpired=false}
Please, note that, when there are no table joins involved, both queries work as expected. Example without joins:
@App:name("delays")
define stream Input(value string);
@sink(type='log')
define stream LogStream (value string);
@info(name='query1')
from Input[value == '1']
select value
output first every 5 sec
insert into LogStream;
@info(name='query2')
from Input[value == '2']
select value
output first every 5 sec
insert into LogStream;
this will produce the following output:
[2020-04-02_18-53-50_305] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846430304, data=[1], isExpired=false}
[2020-04-02_18-53-50_706] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846430305, data=[2], isExpired=false}
[2020-04-02_18-53-55_312] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846438305, data=[1], isExpired=false}
[2020-04-02_18-53-56_114] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846439305, data=[2], isExpired=false}
.
I was wondering if this behaviour is expected, or there is any error at all in the design of the application.
Many thanks!
Upvotes: 0
Views: 167
Reputation: 168
I was able to get the results as in the "without join" by fixing the "insert table 2" query by changing the table1 to table2 in insert into line
-- fill table2
@info(name='insert table 2')
from table_input[value == '2']
insert into table1;
Upvotes: 0