Reputation: 31
Hi: I need the following de-duplication logics to be implemented in Siddhi stream processing. Assume I have an InputStream, and I want to produce the OutputStream as the following:
(1) when the event is the first one (since the Event Processing engine starts) in the InputStream, insert the event to the OutputStream.
(2)if the event with the same signature, for example, the same event name, arrives in within a 2-minute windows, we consider that the event is identical, and we should NOT insert the event into the OutputStream. Otherwise, we should insert the event into the OutputStream.
I tried to use event pattern to do the filtering. However, I can not find that I can express the "negation logics" in Siddhi, that is, if (not ( e1 --> e2 with same signature in 2 minute window)). Is there a clever way to perform such event-deduplication logics? Note that event deduplication is a very common expression needed for event processing.
If I would implement it in Java, that is relatively straightforward. I will create a hash table. When the first event arrives, I register it to the hash able, and set the event acceptable time of this registered event to be 2 minutes later. When the next event arrives, I look up the hash table, and compare the retrieved event's acceptable time with my current event time, and if the current event time is smaller than the acceptable time, I will not consider it as an output event. Instead of the Java implementation, I prefer to having a declarative solution implemented in Siddhi's stream processing query, if that is possible.
Upvotes: 0
Views: 208
Reputation: 1654
You can use an in-memory table and achieve that; Please find the sample below; it's pretty much similar to your approach with Java.
define stream InputStream (event_id string, data string);
define stream OutputStream (event_id string, data string);
define table ProcessedEvents (event_id string);
from InputStream[not(ProcessedEvents.event_id == event_id in ProcessedEvents)]
insert into OutputStream ;
from OutputStream
select event_id
insert into ProcessedEvents ;
from OutputStream#window.time(2 sec)
select event_id
insert expired events into PurgeStream ;
from PurgeStream
delete ProcessedEvents
on ProcessedEvents.event_id == event_id ;
Upvotes: 0