gpk
gpk

Reputation: 1

Siddhi - Fetching from Event tables, which are not updated within certain time

In Siddhi query, I am importing two stream S1 and S2. If I receive in S1 stream I will insert in event table T1, and when I receive in S2 I will update in the T1 table based on the id, and also I will send the updated values from the table into Output stream O1.

As a part of the requirement, I need to get the content which table T1, which is inserted before 5 min(ie, if a record resides more than 5 min) and send to another output stream O2.

@name('S1')
from S1
select id, srcId, 'null' as msgId, 'INP' as status
insert into StatusTable;

@name('S2')
from S2#window.time(1min) as g join StatusTable[t.status == 'INP'] as t
on ( g.srcId == t.id)
select t.id as id, g.msgId as msgId, 'CMP' as status
update StatusTable on TradeStatusTable.id == id;

@name('Publish')
from S2 as g join StatusTable[t.status == 'CMP'] as t on ( g.srcId == t.id and t.status == 'CMP')
select t.id as id, t.msgId as msgId, t.status as status
insert into O1;

How to add a query in this existing query to fetch the records from TradeStatus table, which receides more than 5 minutes. Since the table cannot be used alone, I need to join it with a stream, how to do this scenario?

Upvotes: 0

Views: 155

Answers (1)

cochar
cochar

Reputation: 1

String WebAttackSuccess = "" +
           "@info(name = 'found_host_charged1') "+
           "from ATDEventStream[ rid == 10190001 ]#window.timeBatch(10 sec) as a1 "+
           "join ATDEventStream[ rid == 10180004 ]#window.time(10 sec) as a2 on a2.src_ip == a1.src_ip and a2.dst_ip == a1.dst_ip " +
           " select UUID() as uuid,1007 as cid,a1.sensor_id as sensor_id,a1.interface_id as interface_id,a1.other_id as other_id,count(a1.uuid) as event_num,min(a1.timestamp)  as first_seen,max(a2.timestamp) as last_seen,'' as IOC,a1.dst_ip as victim,a1.src_ip as attacker,a1.uuid as NDE4,sample:sample(a2.uuid) as Sample_NDE4 " +
           " insert into found_host_charged1;"+
           ""+
           "@info(name = 'found_host_charged2') "+
           "from every a1 = found_host_charged1 " +
           "-> a2 = ATDEventStream[dns_answers != ''] "+
           "within 5 min "+
           "select UUID() as uuid,1008 as cid,a2.sensor_id as sensor_id,a2.interface_id as interface_id,a2.other_id as other_id,count(a2.uuid) as event_num,a1.first_seen  as first_seen,max(a2.timestamp) as last_seen,a2.dns_answers as IOC,a2.dst_ip as victim,a2.src_ip as attacker,a1.uuid as NDE5,sample:sample(a2.uuid) as Sample_NDE5 " +
           "insert into found_host_charged2; ";

This is part of my work,i use two stream,maybe you can get the data from StatusTable in your second stream.If not yet resolved,you can change StatusTable to S1.

Upvotes: 0

Related Questions