Reputation: 73
I will have to process 10 billion Nested JSON records per day using NiFi (version 1.9). As part of the job, am trying to convert the nested JSON to csv using Groovy script. I referred the below Stack Overflow questions related to the same topic and came up with the below code.
Groovy collect from map and submap
how to convert json into key value pair completely using groovy
But am not sure how to retrieve the value of duplicate keys. Sample json is defined in the variable "json" in the below code. key "Flag1" will be coming in multiple sections (i.e., "OF" & "SF"). I want to get the output as csv.
Below is the output if I execute the below groovy code 2019-10-08 22:33:29.244000,v12,-,36178,0,0/0,10.65.5.56,sf,sf (flag1 key value is replaced by that key column's last occurrence value)
I am not an expert in Groovy. Also please suggest if there is any other better approach, so that I will give a try.
import groovy.json.*
def json = '{"transaction":{"TS":"2019-10-08 22:33:29.244000","CIPG":{"CIP":"10.65.5.56","CP":"0"},"OF":{"Flag1":"of","Flag2":"-"},"SF":{"Flag1":"sf","Flag2":"-"}}'
def jsonReplace = json.replace('{"transaction":{','{"transaction":[{').replace('}}}','}}]}')
def jsonRecord = new JsonSlurper().parseText(jsonReplace)
def columns = ["TS","V","PID","RS","SR","CnID","CIP","Flag1","Flag1"]
def flatten
flatten = { row ->
def flattened = [:]
row.each { k, v ->
if (v instanceof Map) {
flattened << flatten(v)
} else if (v instanceof Collection && v.every {it instanceof Map}) {
v.each { flattened << flatten(it) }
} else {
flattened[k] = v
}
}
flattened
}
print "output: " + jsonRecord.transaction.collect {row -> columns.collect {colName -> flatten(row)[colName]}.join(',')}.join('\n')
Edit: Based on the reply from @cfrick and @stck, I have tried the option and have follow up question below.
@cfrick and @stck- Thanks for your response.
import groovy.json.JsonSlurper
class customJSONtoCSV implements Processor {
def REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build();
def log
static def flatten(row, prefix="") {
def flattened = new HashMap<String, String>()
row.each { String k, Object v ->
def key = prefix ? prefix + "_" + k : k;
if (v instanceof Map) {
flattened.putAll(flatten(v, k))
} else {
flattened.put(key, v.toString())
}
}
return flattened
}
static def toCSVRow(HashMap row) {
def columns = ["CIPG_CIP","CIPG_CP","CIPG_SLP","CIPG_SLEP","CIPG_CVID","SIPG_SIP","SIPG_SP","SIPG_InP","SIPG_SVID","TG_T","TG_R","TG_C","TG_SDL","DL","I_R","UAP","EDBL","Ca","A","RQM","RSM","FIT","CSR","OF_Flag1","OF_Flag2","OF_Flag3","OF_Flag4","OF_Flag5","OF_Flag6","OF_Flag7","OF_Flag8","OF_Flag9","OF_Flag10","OF_Flag11","OF_Flag12","OF_Flag13","OF_Flag14","OF_Flag15","OF_Flag16","OF_Flag17","OF_Flag18","OF_Flag19","OF_Flag20","OF_Flag21","OF_Flag22","OF_Flag23","SF_Flag1","SF_Flag2","SF_Flag3","SF_Flag4","SF_Flag5","SF_Flag6","SF_Flag7","SF_Flag8","SF_Flag9","SF_Flag10","SF_Flag11","SF_Flag12","SF_Flag13","SF_Flag14","SF_Flag15","SF_Flag16","SF_Flag17","SF_Flag18","SF_Flag19","SF_Flag20","SF_Flag21","SF_Flag22","SF_Flag23","SF_Flag24","GF_Flag1","GF_Flag2","GF_Flag3","GF_Flag4","GF_Flag5","GF_Flag6","GF_Flag7","GF_Flag8","GF_Flag9","GF_Flag10","GF_Flag11","GF_Flag12","GF_Flag13","GF_Flag14","GF_Flag15","GF_Flag16","GF_Flag17","GF_Flag18","GF_Flag19","GF_Flag20","GF_Flag21","GF_Flag22","GF_Flag23","GF_Flag24","GF_Flag25","GF_Flag26","GF_Flag27","GF_Flag28","GF_Flag29","GF_Flag30","GF_Flag31","GF_Flag32","GF_Flag33","GF_Flag34","GF_Flag35","VSL_VSID","VSL_TC","VSL_MTC","VSL_NRTC","VSL_ET","VSL_HRES","VSL_VRES","VSL_FS","VSL_FR","VSL_VSD","VSL_ACB","VSL_ASB","VSL_VPR","VSL_VSST","HRU_HM","HRU_HD","HRU_HP","HRU_HQ","URLF_CID","URLF_CGID","URLF_CR","URLF_RA","URLF_USM","URLF_USP","URLF_MUS","TCPSt_WS","TCPSt_SE","TCPSt_WSFNS","TCPSt_WSF","TCPSt_EM","TCPSt_RSTE","TCPSt_MSS","NS_OPID","NS_ODID","NS_EPID","NS_TrID","NS_VSN","NS_LSUT","NS_STTS","NS_TCPPR","CQA_NL","CQA_CL","CQA_CLC","CQA_SQ","CQA_SQC","TS","V","PID","RS","SR","CnID","A_S","OS","CPr","CVB","CS","HS","SUNR","SUNS","ML","MT","TCPSL","CT","MS","MSH","SID","SuID","UA","DID","UAG","CID","HR","CRG","CP1","CP2","AIDF","UCB","CLID","CLCL","OPTS","PUAG","SSLIL"]
return columns.collect { column ->
return row.containsKey(column) ? row.get(column) : ""
}.join(',')
}
@Override
void initialize(ProcessorInitializationContext context) {
log = context.getLogger()
}
@Override
Set<Relationship> getRelationships() {
return [REL_SUCCESS] as Set
}
@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
try {
def session = sessionFactory.createSession()
def flowFile = session.get()
if (!flowFile) return
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def bufferedReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))
def jsonSlurper = new JsonSlurper()
def line
def header = "CIPG_CIP,CIPG_CP,CIPG_SLP,CIPG_SLEP,CIPG_CVID,SIPG_SIP,SIPG_SP,SIPG_InP,SIPG_SVID,TG_T,TG_R,TG_C,TG_SDL,DL,I_R,UAP,EDBL,Ca,A,RQM,RSM,FIT,CSR,OF_Flag1,OF_Flag2,OF_Flag3,OF_Flag4,OF_Flag5,OF_Flag6,OF_Flag7,OF_Flag8,OF_Flag9,OF_Flag10,OF_Flag11,OF_Flag12,OF_Flag13,OF_Flag14,OF_Flag15,OF_Flag16,OF_Flag17,OF_Flag18,OF_Flag19,OF_Flag20,OF_Flag21,OF_Flag22,OF_Flag23,SF_Flag1,SF_Flag2,SF_Flag3,SF_Flag4,SF_Flag5,SF_Flag6,SF_Flag7,SF_Flag8,SF_Flag9,SF_Flag10,SF_Flag11,SF_Flag12,SF_Flag13,SF_Flag14,SF_Flag15,SF_Flag16,SF_Flag17,SF_Flag18,SF_Flag19,SF_Flag20,SF_Flag21,SF_Flag22,SF_Flag23,SF_Flag24,GF_Flag1,GF_Flag2,GF_Flag3,GF_Flag4,GF_Flag5,GF_Flag6,GF_Flag7,GF_Flag8,GF_Flag9,GF_Flag10,GF_Flag11,GF_Flag12,GF_Flag13,GF_Flag14,GF_Flag15,GF_Flag16,GF_Flag17,GF_Flag18,GF_Flag19,GF_Flag20,GF_Flag21,GF_Flag22,GF_Flag23,GF_Flag24,GF_Flag25,GF_Flag26,GF_Flag27,GF_Flag28,GF_Flag29,GF_Flag30,GF_Flag31,GF_Flag32,GF_Flag33,GF_Flag34,GF_Flag35,VSL_VSID,VSL_TC,VSL_MTC,VSL_NRTC,VSL_ET,VSL_HRES,VSL_VRES,VSL_FS,VSL_FR,VSL_VSD,VSL_ACB,VSL_ASB,VSL_VPR,VSL_VSST,HRU_HM,HRU_HD,HRU_HP,HRU_HQ,URLF_CID,URLF_CGID,URLF_CR,URLF_RA,URLF_USM,URLF_USP,URLF_MUS,TCPSt_WS,TCPSt_SE,TCPSt_WSFNS,TCPSt_WSF,TCPSt_EM,TCPSt_RSTE,TCPSt_MSS,NS_OPID,NS_ODID,NS_EPID,NS_TrID,NS_VSN,NS_LSUT,NS_STTS,NS_TCPPR,CQA_NL,CQA_CL,CQA_CLC,CQA_SQ,CQA_SQC,TS,V,PID,RS,SR,CnID,A_S,OS,CPr,CVB,CS,HS,SUNR,SUNS,ML,MT,TCPSL,CT,MS,MSH,SID,SuID,UA,DID,UAG,CID,HR,CRG,CP1,CP2,AIDF,UCB,CLID,CLCL,OPTS,PUAG,SSLIL"
outputStream.write("${header}\n".getBytes('UTF-8'))
while (line = bufferedReader.readLine()) {
def jsonReplace = line.replace('{"transaction":{','{"transaction":[{').replace('}}}','}}]}')
def jsonRecord = new JsonSlurper().parseText(jsonReplace)
def a = jsonRecord.transaction.collect { row ->
return flatten(row)
}.collect { row ->
return toCSVRow(row)
}
outputStream.write("${a}\n".getBytes('UTF-8'))
}
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
session.commit()
}
catch (e) {
throw new ProcessException(e)
}
}
@Override
Collection<ValidationResult> validate(ValidationContext context) { return null }
@Override
PropertyDescriptor getPropertyDescriptor(String name) { return null }
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
@Override
List<PropertyDescriptor> getPropertyDescriptors() {
return [] as List
}
@Override
String getIdentifier() { return null }
}
processor = new customJSONtoCSV()
return toCSVRow(row).toString()
Upvotes: 1
Views: 1170
Reputation: 37008
If you know what you want to extract exactly (and given you want to generate a CSV from it) IMHO you are way better off to just shape the data in the way you later want to consume it. E.g.
def data = new groovy.json.JsonSlurper().parseText('[{"TS":"2019-10-08 22:33:29.244000","CIPG":{"CIP":"10.65.5.56","CP":"0"},"OF":{"Flag1":"of","Flag2":"-"},"SF":{"Flag1":"sf","Flag2":"-"}}]')
extractors = [
{ it.TS },
{ it.V },
{ it.PID },
{ it.RS },
{ it.SR },
{ it.CIPG.CIP },
{ it.CIPG.CP },
{ it.OF.Flag1 },
{ it.SF.Flag1 },]
def extract(row) {
extractors.collect{ it(row) }
}
println(data.collect{extract it})
// ⇒ [[2019-10-08 22:33:29.244000, null, null, null, null, 10.65.5.56, 0, of, sf]]
As stated in the other answer, due to the sheer amount of data you are trying to convert::
collect
(it is eager) to create the rows.Upvotes: 1
Reputation: 82
The idea is to modify "flatten" method - it should differentiate between same nested keys by providing parent key as a prefix. I've simplified code a bit:
import groovy.json.*
def json = '{"transaction":{"TS":"2019-10-08 22:33:29.244000","CIPG":{"CIP":"10.65.5.56","CP":"0"},"OF":{"Flag1":"of","Flag2":"-"},"SF":{"Flag1":"sf","Flag2":"-"}}'
def jsonReplace = json.replace('{"transaction":{','{"transaction":[{').replace('}}','}}]')
def jsonRecord = new JsonSlurper().parseText(jsonReplace)
static def flatten(row, prefix="") {
def flattened = new HashMap<String, String>()
row.each { String k, Object v ->
def key = prefix ? prefix + "." + k : k;
if (v instanceof Map) {
flattened.putAll(flatten(v, k))
} else {
flattened.put(key, v.toString())
}
}
return flattened
}
static def toCSVRow(HashMap row) {
def columns = ["TS","V","PID","RS","SR","CnID","CIP","OF.Flag1","SF.Flag1"] // Last 2 keys have changed!
return columns.collect { column ->
return row.containsKey(column) ? row.get(column) : ""
}.join(', ')
}
def a = jsonRecord.transaction.collect { row ->
return flatten(row)
}.collect { row ->
return toCSVRow(row)
}.join('\n')
println a
Output would be:
2019-10-08 22:33:29.244000, , , , , , , of, sf
Upvotes: 0