Reputation: 39
I am a rookie at both NiFi and Python and I need your help to pass the Flow File attribute value to the script. The script is converting a nested json into csv. When I run the script locally it works.
How can I pass the FlowFile name to src_json and tgt_csv?
Thanks,
Rosa
import pandas as pd
import json
from pandas.io.json import json_normalize
src_json = "C:/Users/name/Documents/Filename.json"
tgt_csv = "C:/Users/name/Documents/Filename.csv"
jfile = open(src_json)
jdata = json.load(jfile)
...rest of the code...
```python
Upvotes: 2
Views: 4737
Reputation: 14194
You have a few options to accomplish this task.
ConvertRecord
processor which largely accomplishes this task. If your nested JSON is a problem or you have other reasons for wanting to do this in a Python script, continue below. ExecuteScript
(better for prototyping) and InvokeScriptedProcessor
(more performant for production tasks) allow you to run Python (actually Jython) scripts inside the NiFi instance. This gives you direct access to some convenience methods & functionality. However, because Jython cannot handle natively-compiled Python libraries, you will not be able to use pandas
in this code. See here for instructions on configuring this processor and here for why pandas
will not work. pandas
for some functionality, you'll need to save the script as a Python file on the local file system and invoke it as a shell command using ExecuteStreamCommand
(if you need to provide input to this processor) or ExecuteProcess
(if it's the first processor in your flow). These processors essentially run a shell command like python my_python_script_with_pandas.py -somearg
(in ExecuteProcess
) or python my_python_script_with_pandas.py
with the flowfile content as STDIN
(in ExecuteStreamCommand
) and the output of STDOUT
captured as the resulting flowfile content. Currently your script is looking for the incoming JSON file in a static file location, and putting the resulting CSV in another static file location. You will need to change the script to do one of the following:
-inputfile /path/to/some_existing_file.json -outputfile ${flowfile_attribute_named_output_file}
or any combination thereof. Your script would then read the -inputfile
and -outputfile
arguments to determine the paths. STDIN
example here. Then process the JSON data, convert it to CSV, and return it via STDOUT
. NiFi will consume this data, put it as the content of the resulting flowfile, and send it to the next processor(s) in your flow. ExecuteScript
processor handling flowfile content in Python. Upvotes: 1