Nicko
Nicko

Reputation: 19

Apache NiFi: ExecuteStreamCommand generating two flow files

I'm currently running in a problem withe Apache NiFi ExecuteStreamCommand using PYthon. I have a script which reads a csv and converts it in a pandas-Dataframes and afterwards in a JSON. The script splits the csv file in several DataFrames due to inconsistent naming of the columns. My current script looks as follows:

import pandas as pd
import sys

input = sys.stdin.readlines()

#searching for subfiles and saving them to a list with files ...

appendDataFrames = []

for dataFrames in range(len(files)):
   df = pd.DataFrame(files[dataFrame])
   #several improvements of DataFrame ...
   appendDataFrames.append(df)

output = pd.concat(appendDataFrames)
JSONOutPut = output.to_json(orient='records', date_format='iso', date_unit='s')
sys.stdout.write(JSONOutPut)

In the queue to my next processor I can now see one FlowFile as JSON (as expected). My question is, is it possible to write each JSON in seperate FlowFiles, so that my next processor is able to work at them separated? I need to do this because the next processor is a InferAvroSchema and since all JSONs have different schemas this is no opportunity. Am I mistaken? Or is there a possible way to solve this?

The code below won't work since its anyway in the same flow file and my InferAvroSchema is not able to handle this separated.

import pandas as pd
import sys

input = sys.stdin.readlines()

#searching for subfiles and saving them to a list with files ...

appendDataFrames = []

for dataFrames in range(len(files)):
   df = pd.DataFrame(files[dataFrame])
   #several improvements of DataFrame ...
   JSONOutPut = df.to_json(orient='records', date_format='iso', date_unit='s')
   sys.stdout.write(JSONOutPut)

Thanks in advance!

Upvotes: 0

Views: 1051

Answers (2)

Nicko
Nicko

Reputation: 19

I just modified my code as follows:

import pandas as pd
import sys

input = sys.stdin.readlines()

#searching for subfiles and saving them to a list with files ...

appendDataFrames = []

for dataFrames in range(len(files)):
   df = pd.DataFrame(files[dataFrame])
   #several improvements of DataFrame ...
   JSONOutPut = df.to_json(orient='records', date_format='iso', date_unit='s')
   sys.stdout.write(JSONOutPut)
   sys.stdout.write("#;#")

And added a SplitContent processor like:

Properties of SplitContent-processor

Upvotes: 0

daggett
daggett

Reputation: 28644

with ExecuteStreamCommand you can't split output because you have to write to stdout.

However you could write some delimiter into output and use SplitContent with the same delimiter as next processor.

Upvotes: 1

Related Questions