whatsinthename
whatsinthename

Reputation: 2157

How to read files in ExecuteStreamCommand processor in NiFi

My end goal is to mask the data in one particular file. I want to move files from one place to another. During this transfer process, I have to mask the data using a Python script. So, I designed below flow:

GetFile > ExecuteStreamCommmand > PutFile

I designed one Python script using pandas. I am running this NiFi on Virtual Machine created on Google Cloud Platform where I have installed Python-2.7 and NiFi-1.9.1. Below is my Pandas code:

import pandas as pd
readFile = pd.read_csv("/path",sep=" ",header=None)
readFile.columns = ['IP']
readFile['IP'] = readFile['IP'].replace(regex='((?<=[0-9])[0-9]|(?<=\.)[0-9])',value='X')
readFile.to_csv("/path", sep=' ')

I have below doubts:
1) Using getFile processor I am passing the file in the queue to the next processor i.e ExecuteStreamCommand processor.
2) Also, in my Python code, I am trying to read the data from the same input directory that was passed in the GetFile processor but now the file has been moved to the queue between getfile > executestreamcommand. So how will it read it?
3) After the python script is executed how can I use a putFile processor to save it back at some other place?

I am new to NiFi so trying to understand basic things. Also, I have attached the flow and error screenshot. enter image description here

Upvotes: 0

Views: 3450

Answers (2)

KReddy
KReddy

Reputation: 3

You may need to provide the python .py file in the Volume where nifi has registered.

for example opt/nifi/nifi-current/ if its a docker image

Upvotes: 0

daggett
daggett

Reputation: 28634

ExecuteStreamCommand

The content of the flow file passed into a command (python in your case) as stdin stream

so, you have to use following code:

readFile = pd.read_json(sys.stdin)

on other hand if you need to apply regexp replace to the flow file, you could try to use ReplaceText processor instead of ExecuteStreamCommand

Upvotes: 1

Related Questions