Pritish Kamath
Pritish Kamath

Reputation: 159

Nifi getting error in ExecuteScript

I have been trying to extract data from nifi's ExecuteScript Processor and attach to the flowfile as attributes. I have tried many sources out there, especially the one at funnifi blog of Matt Burgess.

Following is my code

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

originalFlowFile = session.get()
text = IOUtils.toString(originalFlowFile)
log.info(text)

if(originalFlowFile != None):
    event = json.loads(text)
    if (event['true'] == 'Y'):
        flowfile = session.putAttribute(flowfile, "true", "Y")
    elif (event['src'] == 'ONE' ):
        allAttributes = { "true": "N", "src": "ONE" }
        flowfile = session.putAllAttributes(flowfile, allAttributes)
    elif (event['src'] == 'TWO' ):
        allAttributes = { "true": "N", "src": "TWO" }
        flowfile = session.putAllAttributes(flowfile, allAttributes)
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()

This is in python. The flowfile data IS a JSON. Still, I am unable to parse it.

The INFO part is the output of

text = IOUtils.toString(originalFlowFile)

The error I am getting

Any help would be greatly appreciated.

p.s. I am unfamiliar with python

Test data

{
  "true":"N",
  "src":"ONE",
  "var1":"value1",
  "var2":"value2"
}

UPDATE

My updated code, which still does not work:

import json
import java.io
from org.apache.commons.io import IOUtils

originalFlowFile = session.get()

if(originalFlowFile != None):
    inputStream = session.read(originalFlowFile)
    text = IOUtils.toString(inputStream)
    log.info(text)
    event = json.loads(text)
     if (event['true'] == 'Y'):
        flowfile = session.putAttribute(flowfile, "true", "Y")
    elif (event['src'] == 'ONE' ):
        allAttributes = { "true": "N", "src": "ONE" }
        flowfile = session.putAllAttributes(flowfile, allAttributes)
    elif (event['src'] == 'TWO' ):
        allAttributes = { "true": "N", "src": "TWO" }
        flowfile = session.putAllAttributes(flowfile, allAttributes)
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()

Upvotes: 0

Views: 3125

Answers (1)

Bryan Bende
Bryan Bende

Reputation: 18670

Calling IOUtils.toString() on the flow file itself is likely not going to work since the flow file is not an InputStream, or Reader, or something that can be read on its own. I'm actually surprised that line doesn't produce an exception.

There are two ways you could get the content of a flow file...

The first is by getting the InputStream for a flow file from the session:

originalFlowFile = session.get();
inputStream = session.read(originalFlowFile);
text = IOUtils.toString(inputStream);

The second is by using an InputStreamCallback:

flowFile = session.read(flowFile, {inputStream ->
   // read the inputStream
} as InputStreamCallback);

Upvotes: 3

Related Questions