Dae Woon Koo
Dae Woon Koo

Reputation: 1

Integrating WSO2 Siddhi CEP and Kafka

I'm currently in the process of integrating WSO2's Siddhi CEP and Kafka. I want to produce a Siddhi stream by receiving events from Kafka. The Kafka data being received is in JSON format, where each event looks something like this:

{  
   "event":{  
      "orderID":"1532538588320",
      "timestamps":[  
         15325,
         153
      ],
      "earliestTime":1532538
   }
}

The SiddhiApp that I'm trying to run in the WSO2 stream processor looks like this:

@App:name('KafkaSiddhi')
@App:description('Consume events from a Kafka Topic and print the output.')

-- Streams
@source(type='kafka', 
topic.list = 'order-aggregates',
partition.no.list = '0',
threading.option = 'single.thread',
group.id = 'time-aggregates',
bootstrap.servers = 'localhost:9092, localhost:2181',
@map(type='json'))
define stream TimeAggregateStream (orderID string,timestamps 
object,earliestTime long);

@sink(type="log")
define stream TimeAggregateResultStream (orderID string, timestamps 
object, earliestTime long);

-- Queries
from TimeAggregateStream 
select orderID, timestamps, earliestTime
insert into TimeAggregateResultStream;

Running this app should log all of the data being updated in the order-aggregates Kafka cluster that I'm listening to. But I see no output whatsoever when click run.

I can tell that there is some type of interaction between the WSO2 stream processor and the order-aggregates topic, because error messages are outputted in real-time whenever I run the application with inconsistent data types for my stream schema. The error messages look like this:

[2018-07-25_10-14-37_224] ERROR 
{org.wso2.extension.siddhi.map.json.sourcemapper.JsonSourceMapper} - 
Json message {"event":{"orderID":"210000000016183","timestamps": 
[1532538627000],"earliestTime":1532538627000}} contains incompatible 
attribute types and values. Value 210000000016183 is not compatible with 
type LONG. Hence dropping the message. (Encoded) 

However, when I have the schema setup correctly, I receive no output at all when I run the application. I really don't know how to make sense of this. When I try to debug this by putting a breakpoint into the line including 'insert into', the debugger never stops at that line.

Can anyone offer some insight on how to approach this issue?

Upvotes: 0

Views: 509

Answers (1)

Ramindu De Silva
Ramindu De Silva

Reputation: 3

We have added the object support for json mapper extension in the latest release of the extension. Please download the extension[1] and replace the siddhi-map-json jar in /lib.

[1] https://store.wso2.com/store/assets/analyticsextension/details/0e6a6b38-f1d1-49f5-a685-e8c16741494d

Upvotes: 1

Related Questions