Reputation: 41
With NiFi I generate flowfiles data and I retrieve groovy script files by passing in parameter the filenames of the data flowfiles I have, I would like to know how I can execute the groovy scripts (file) one by one for each data flowfile??!!
Thank you for help
Upvotes: 0
Views: 926
Reputation: 41
@daggett This is example of data in 3 flowfiles for each table:
Table_CCL
+0002150736GG+0801448361KL 0001-01-01SSS+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000-000000000002860+000000000000000+000000000000000 1+0000000000000002023-05-23+0601148361
+0002152097NN+0901363467ML 0001-01-01RRR+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000011845+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000 1+0000000000000002023-05-23+0501463467
+0002155389PP+0001999097LL 0001-01-01XXX+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000-000000000002405+000000000000000+000000000000000 1+0000000000000002023-05-23+0401899097
Table_LCP
PX37115291656+0002004736XXL30518240 915292479 +0700156745+0700156745+0700156745D+00351 082023-05-072023-04-252023-05-012023-05-07 +000000000000000+000000000000000+00001302EUR+000000000005800+000000000005800+000000000005800+000.0000512023-05-23 21 N C50ED50 I014959 2023-05-231S+0000000000+0600076990+0000001000 +000000000000000+000000000000000 0 0FJUY_AV 5FX7SI+0800156745
PX37083105408+0002059840XXL30425660 914119751 +0700757773+0700757773+0700757773D+00351 082023-04-072023-03-242023-04-012023-04-07 +000000000000000+000000000000000+00001302EUR+000000000005298+000000000005298+000000000005298+000.0000842023-05-23 21 N C50ED50 I021948 2023-05-231S+0000000000+0600361406+0000001000 +000000000000000+000000000000000 0 0FVFR_AV 5FX7SI+0900757773
PX37093149567+0002169371XXL30505273 22022657S +0702188717+0702188717+0702188717D+00550 082023-04-092023-04-032023-04-032023-04-09 +000000000000000+000000000000000+00001302EUR+000000000372598+000000000310498+000000000000000+000.0000552023-05-23 20 N C50ED50 I014959 2023-05-231S+0000000000+0600960024+0000001000 +000000000000000+00000000000000022022657S 0 0FATR_AV 3 5FX7SI+0602188717
Table_LDC
PX37143489198+0000+0002175736LLM30003604 23200228S D+00804 2023-05-312023-05-31 +000000000043200+0000000000000003XXX+000000000043200+000000000043200+000000000000000+000.00000 C50ED50 2023-05-23 +00000S 0 0FACT_AV 082023-05-310 +005 0
PX37143489199+0000+0002003027LLL30633874 23210097S D+00350 2023-05-292023-05-29 +000000000002328+0000000000000003YYY+000000000002328+000000000002328+000000000002328+000.00001 C50ED50 C50ED50 2023-05-23 +00000S 0 0FACT_AV 082023-05-290 +005 0
and this is example of the script groovy to parse table LDC in data flowfile ( I have 3 differents script for each data of table to parse the files data in flowfile and convert to json format:
import groovy.json.JsonBuilder
import java.nio.charset.StandardCharsets
import java.time.LocalDate
import java.lang.Exception
import java.lang.String
import groovy.lang.ObjectRange
import java.time.format.DateTimeFormatter
import java.text.SimpleDateFormat
Number toNumber( String line, IntRange range) {
try {
String v = line[range]?.trim()
return v ? v.toBigDecimal() : null
} catch( Exception ex ) {
throw new Exception("Encountered exception at ${range} in line ${line}: ${ex}", ex)
}
}
def flowFile = session.get()
if (flowFile) {
def datePattern = new SimpleDateFormat('yyyy-MM-dd')
def records = []
flowFile.read().withReader('ISO-8859-1') { reader ->
reader.eachLine { line ->
def record = [
SEQ_LD: 'ADMIN.SEQ_INFO.NEXTVAL',
ID_LIG_CPTE: line[0..12]?.trim() ?: '',
NO_ORD: toNumber(line, 13..17 ),
NO_CCL: toNumber(line, 18..28 ),
CD_PA: line[29..30]?.trim() ?: '',
ID_MVT: line[31..45]?.trim() ?: '',
NO_CNT: line[46..56]?.trim() ?: '',
CD_SENS: line[57..57]?.trim() ?: '',
ID_INTRN_MVT: toNumber(line, 58..63 ),
SS_ID_INTRN_MVT: line[64..65]?.trim() ?: '',
DT_EXBL: line[66..75]?.trim() != '0001-01-01' ? datePattern.parse(line[66..75]?.trim()) : null,
DT_VAL_CLI: line[76..85]?.trim() != '0001-01-01' ? datePattern.parse(line[76..85]?.trim()) : null,
DT_PEC_CAL_IR: line[86..95]?.trim() != '0001-01-01' ? datePattern.parse(line[86..95]?.trim()) : null,
MT_SOL_MVT: toNumber(line, 96..111 ),
MT_SOL_AST_IR: toNumber(line, 112..127 ),
STA_MVT: line[128..128]?.trim() ?: '',
CD_DEV: line[129..131]?.trim() ?: '',
MT_MVT: toNumber(line, 132..147 ),
MT_MVT_HT: toNumber(line, 148..163 ),
MT_AST_IR: toNumber(line, 164..179 ),
TX_TVA_CCL: toNumber(line, 180..188 ),
IND_PEC_CAL_IR: line[189..189]?.trim() ?: '',
ID_ACO: line[190..202]?.trim() ?: '',
CD_TY_ETA: line[203..204]?.trim() ?: '',
CD_ETA: line[205..205]?.trim() ?: '',
ID_COLLA_CRE: line[206..213]?.trim() ?: '',
ID_COLLA_MAJ: line[214..221]?.trim() ?: '',
DT_CRE_CCL: line[222..231]?.trim() != '0001-01-01' ? datePattern.parse(line[222..231]?.trim()) : null,
DT_MAJ: line[232..241]?.trim() != '0001-01-01' ? datePattern.parse(line[232..241]?.trim()) : null,
NO_PRI_LTR_ACO: toNumber(line, 242..247 ),
ID_INTRN_PROD: line[248..248]?.trim() ?: '',
DT_TRT: datePattern.parse('2023-06-05'),
CD_SOC_FOUR: line[259..262]?.trim() ?: '',
IND_REPRIS_STD: line[263..263]?.trim() ?: '',
CD_FAM_MVT_DEBI: line[264..273]?.trim() ?: '',
MOD_PAI: line[274..275]?.trim() ?: '',
DT_ECH_FACT: line[276..285]?.trim() != '0001-01-01' ? datePattern.parse(line[276..285]?.trim()) : null,
NO_ECH: line[286..288]?.trim() ?: '',
CD_ODM: line[289..289]?.trim() ?: '',
CD_MOT_ODM: line[290..292]?.trim() ?: '',
CD_MOT_REMB: line[293..300]?.trim() ?: '',
DEL_IMP: toNumber(line, 301..304 ),
IND_IMP: line[305..305]?.trim() ?: '',
DT_VAL_IMP: line[306..315]?.trim() != '0001-01-01' ? datePattern.parse(line[306..315]?.trim()) : null,
DT_EVT_IMP: line[316..325]?.trim() != '0001-01-01' ? datePattern.parse(line[316..325]?.trim()) : null,
DT_RETR_IMP: line[326..335]?.trim() != '0001-01-01' ? datePattern.parse(line[326..335]?.trim()) : null,
MOT_IMP: line[336..337]?.trim() ?: '',
IND_PEC_EXBL: line[338..338]?.trim() ?: ''
]
records << record
}
}
def json = new JsonBuilder(records)
def jsonBytes = json.toString().getBytes(StandardCharsets.UTF_8)
flowFile = session.write(flowFile, { outputStream ->
outputStream.write(jsonBytes)
} as OutputStreamCallback)
flowFile = session.putAttribute(flowFile, 'filename', 'ti_ldc.json')
session.transfer(flowFile, REL_SUCCESS)
}
The result in output must be 3 Flowfile data processed by groovy scripts as input.
Upvotes: 0