Reputation: 1
I have a requirement to execute a complex json processing. I have almost completed but struck with a logic of filtering the events based on an attribute. I know filtering on a normal basis but this is a case where tokenizeAsObject function is written next to the from statement. Where and how he filtering part should be written in this case? Your help is greatly appreciated. I could find normal filtering for queries but couldn' find anything having tokenizing method in the querying/analysing data part of the code.
--THE CODE GOES HERE
@App:name('CompanyClientDetailingApp3')
@App:description('Description of the client details of the company')
@source(type='http', receiver.url='http://localhost:5005/clientDetails',
@map(type='json', @attributes(json = '$')
)
)
define stream CompanyClientStream3 (json string);
@sink(type = 'log',
@map(type = 'passThrough'))
define stream EachProjectStream3 (Client string, clientContractTerm string, projectName string, projectContractTerm int);
@info(name = 'clientProjectquery')
from CompanyClientStream3#json:tokenizeAsObject(json, '$.CompanyClients')
select json:getString(jsonElement, '$.Client') as Client, json:getString(jsonElement, '$.Invoice.ContractTerm') as clientContractTerm, json:getObject(jsonElement, '$.Invoice.Projects') as projectList
insert into EachClientProjectStream3;
@info(name = 'projectSttreamQuery')
from EachClientProjectStream3#json:tokenizeAsObject(projectList, "$")
select Client, clientContractTerm, json:getString(jsonElement, '$.ProjectName') as projectName, json:getInt(jsonElement, '$.ProjectTerm') as projectContractTerm
insert into EachProjectStream3;
Filtering is based on ProjectTerm. i.e. Projects having Projecterm > 5 years must be streamed out.
--Inputs for the same
{
"CompanyClients": [
{
"Client": "C1",
"Invoice": {
"ContractTerm": "5",
"Unit": "years",
"Projects": [
{"ProjectName":"C1P1", "ProjectTerm":"5", "TermUnit": "years"},
{"ProjectName":"C1P2", "ProjectTerm":"3", "TermUnit": "years"},
{"ProjectName":"C1P3", "ProjectTerm":"2", "TermUnit": "years"}
]
}
},
{
"Client": "C3",
"Invoice": {
"ContractTerm": "10",
"Unit": "years",
"Projects": [
{"ProjectName":"C3P1", "ProjectTerm":"8", "TermUnit": "years"},
{"ProjectName":"C3P2", "ProjectTerm":"5", "TermUnit": "years"},
{"ProjectName":"C3P3", "ProjectTerm":"6", "TermUnit": "years"}
]
}
}
]
}
Thanks, Kaushik.
Upvotes: 0
Views: 83
Reputation: 1445
Output from the EachProjectStream3 can be filtered in the consecutive query,
from EachProjectStream3[projectContractTerm < 5]
slect *
insert into FilteredStream;
Upvotes: 0