Reputation: 11
This is to get some clarity on the JSON mapping of the generated JSON events in WSO2 CEP.
I configured two buckets for this. I have a string (Suresh 7 LeadSE
) and I'm converting that into a JSON object. First bucket is getting the input string and I have written a siddhi extension to convert this into JSON.
The FirstBucket will get the input as String and converts it into a JSON and put it in a topic called parsedPacketTopic. Now I would like to get the individual elements from this JSON. I am trying to get this through the SecondBuket configuration. However, I don't know how to map the generated JSON value in the SecondBucket.
I am getting null values for the fields expInYears
, empName
, position
and I don't know how exactly to map the generated JSON to these fields.
Can anyone help on this?
FirstBucket configuration
<cep:input brokerName="localAgentBroker" topic="rawPacketTopic/1.0.0">
<cep:tupleMapping queryEventType="Tuple" stream="rawPacketStream">
<cep:property inputDataType="payloadData" inputName="rawPacket"
name="rawPacket" type="java.lang.String"/>
</cep:tupleMapping>
</cep:input>
<cep:query name="Queryfirst">
<cep:expression><![CDATA[from rawPacketStream[rawPacket!="null"]
insert into parsePacketStream customExtn:testFun(rawPacket) as pac]]>
</cep:expression>
<cep:output brokerName="activemqJmsBroker" topic="parsedPacketTopic">
<cep:mapMapping>
<cep:property name="parsedPac" valueOf="pac"/>
</cep:mapMapping>
</cep:output>
</cep:query>
Stream Definition of rawPacketTopic
{"streamId":"rawPacketTopic:1.0.0","name":"rawPacketTopic","version":"1.0.0","nickName":"PVT_Data","description":"PVT_Data","metaData":[{"name":"clientType","type":"STRING"}],"payloadData":[{"name":"rawPacket","type":"STRING"}]}
Stream Definition of parsedPacketTopic
{"streamId":"parsedPacketTopic:1.0.0","name":"parsedPacketTopic","version":"1.0.0","description":"PVTsinJson","metaData":[{"name":"ClientType","type":"STRING"}],"payloadData":[{"name":"parsedPac","type":"STRING"},
{"name":"expInYears","type":"INT"},{"name":"empName","type":"STRING"},{"name":"position","type":"STRING"}]}
i am getting the parsedPac json value as {"expInYears":7,"empName":"Suresh","position":"LeadSE"}
SecondBucket Configuration
<cep:input brokerName="activemqJmsBroker" topic="parsedPacketTopic">
<cep:mapMapping queryEventType="Tuple" stream="parsedPacketStream">
<cep:property inputDataType="payloadData" inputName="parsedPac" name="parsedPac" type="java.lang.String"/>
<cep:property inputDataType="payloadData" inputName="expInYears" name="expInYears" type="java.lang.Integer"/>
<!--<cep:property inputDataType="payloadData" inputName="empName" name="empName" type="java.lang.String"/>
<cep:property inputDataType="payloadData" inputName="position" name="position" type="java.lang.String"/>-->
</cep:mapMapping>
</cep:input>
<cep:query name="SecondQuery">
<cep:expression><![CDATA[from parsedPacketStream[parsedPac !="null"]
insert into displayPacketStream * ]]></cep:expression>
<cep:output brokerName="activemqJmsBroker" topic="displayTopic">
<!--<cep:mapMapping>
<cep:property name="expInYears" valueOf="expInYears"/>
<cep:property name="empName" valueOf="empName"/>
<cep:property name="position" valueOf="position"/>
</cep:mapMapping> -->
<cep:textMapping>Experience is - {expInYears}</cep:textMapping>
</cep:output>
</cep:query>
Stream Definition of displayTopic
{"streamId":"displayTopic:1.0.0","name":"displayTopic","version":"1.0.0","description":"PVTsinJson","metaData":[{"name":"ClientType","type":"STRING"}],
"payloadData":[{"name":"expInYears","type":"INT"},{"name":"empName","type":"STRING"},{"name":"position","type":"STRING"}]}
Upvotes: 0
Views: 1072
Reputation: 686
I think you are attempting JSON mapping which is not directly supported in WSO2 CEP 2.1.0.
If I understood your question correctly, I think you are converting the raw input into JSON from the extension in first cep bucket and then writing the second query based on that. However, since JSON input mapping is not directly supported, the second query will only see the whole string and not a JSON object.
Is there a specific requirement to convert the raw input to JSON in your scenario?
If not, using your custom extension, you can convert it to a CEP 2.1.0 supported format other than JSON (such as Map, Tuple, XML) which you should be able to process without any issues.
Another approach might be to send JSON converted events via the REST API as in documentation sample provided in [1]. This API will convert the JSON events to Tuple Events by default.
Anyway, JSON Input mapping will be supported from the next CEP 3.0.0 release.
[1] http://docs.wso2.org/wiki/display/CEP210/Build+Analyzer
HTH,
Upvotes: 1