gary
gary

Reputation: 508

Running python code in Apache Nifi ExecuteStreamCommand

I'm trying to run python code in Nifi ExecuteStreamCommand processor.

The code includes non pure python modules like Pandas and Numpy so to use Nifi executeScript is not an option.

The issue is around reading in flow file and modifying flow file content.

Apparently it is possible to read incoming flow file with STDIN and to write out with STDOUT, see this SO question: Python Script using ExecuteStreamCommand

But I have not been able to get this working.

1. Tried simply reading in a CSV from STDIN and modifying it, but when sent to putFile processor the file is the same.

import sys
import pandas as pd
import io

df = pd.read_csv(io.StringIO(sys.stdin.read(1)))
df2 = pd.DataFrame([[5, 6], [7, 8]], columns=list('AB'))
df2 = df.append(df2)

2. Tried wrapping some other code in a function and returning in assumption that function output would go to STDOUT, but same outcome.

def convert_csv_dataframe():
    a = pd.read_csv(io.StringIO(sys.stdin.read(1)))
    a.replace(["ABC", "AB"], "A", inplace=True)
    return a

convert_csv_dataframe()

If anybody can help it would be most appreciated.

EDIT:

This code works. The issue was in Nifi. I was reading from "original" relationship instead of "output flow" relationship. Note that stdin is reading one line but don't think that should make a difference. The only question I have is: Can I reference a flow file itself (not it's contents) from executeStreamCommand ?

import sys

a = sys.stdin.readline()
a = a.upper()
sys.stdout.write(a)

Upvotes: 2

Views: 4165

Answers (1)

Bryan Bende
Bryan Bende

Reputation: 18630

I think you need to write to STDOUT somewhere in your script. I don't know much Python, but both examples look like you read from STDIN and then modify data in memory, but never write it back out.

Upvotes: 1

Related Questions