RosaNegra
RosaNegra

Reputation: 39

How to pass flow files to the Execute Python script and use attributes & Nifi variables to store that file?

I am a rookie at both NiFi and Python and I need your help to pass the Flow File attribute value to the script. The script is converting a nested json into csv. When I run the script locally it works.

How can I pass the FlowFile name to src_json and tgt_csv?

Thanks,

Rosa

import pandas as pd
import json
from pandas.io.json import json_normalize

src_json = "C:/Users/name/Documents/Filename.json"
tgt_csv = "C:/Users/name/Documents/Filename.csv"

jfile = open(src_json)
jdata = json.load(jfile)

...rest of the code...
```python

Upvotes: 2

Views: 4737

Answers (1)

Andy
Andy

Reputation: 14194

You have a few options to accomplish this task.

  1. As Arun211 pointed out, there is an existing ConvertRecord processor which largely accomplishes this task. If your nested JSON is a problem or you have other reasons for wanting to do this in a Python script, continue below.
  2. If you have an existing Python script which performs this task as shown above, you'll need to invoke it from NiFi while providing the data to the script. You can use:
    1. ExecuteScript (better for prototyping) and InvokeScriptedProcessor (more performant for production tasks) allow you to run Python (actually Jython) scripts inside the NiFi instance. This gives you direct access to some convenience methods & functionality. However, because Jython cannot handle natively-compiled Python libraries, you will not be able to use pandas in this code. See here for instructions on configuring this processor and here for why pandas will not work.
    2. If you need pandas for some functionality, you'll need to save the script as a Python file on the local file system and invoke it as a shell command using ExecuteStreamCommand (if you need to provide input to this processor) or ExecuteProcess (if it's the first processor in your flow). These processors essentially run a shell command like python my_python_script_with_pandas.py -somearg (in ExecuteProcess) or python my_python_script_with_pandas.py with the flowfile content as STDIN (in ExecuteStreamCommand) and the output of STDOUT captured as the resulting flowfile content.

Currently your script is looking for the incoming JSON file in a static file location, and putting the resulting CSV in another static file location. You will need to change the script to do one of the following:

  1. Read those paths from command-line arguments and pass those in the relevant processor property in the processor you select. These properties can be populated from flowfile attributes, so you could do something like Command Arguments: -inputfile /path/to/some_existing_file.json -outputfile ${flowfile_attribute_named_output_file} or any combination thereof. Your script would then read the -inputfile and -outputfile arguments to determine the paths.
  2. Read the incoming data directly from STDIN example here. Then process the JSON data, convert it to CSV, and return it via STDOUT. NiFi will consume this data, put it as the content of the resulting flowfile, and send it to the next processor(s) in your flow.
  3. The prior two options keep your Python script independent from NiFi; it is unaware of any of the "flowfile" constructs. This option will make it NiFi-specific, but allow further functionality (see option 2.1 above). To write Python code that reads and writes directly from/to the flowfile content, see this example of ExecuteScript processor handling flowfile content in Python.

Upvotes: 1

Related Questions