Reputation: 159
I have been trying to extract data from nifi's ExecuteScript Processor and attach to the flowfile as attributes. I have tried many sources out there, especially the one at funnifi blog of Matt Burgess.
Following is my code
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
originalFlowFile = session.get()
text = IOUtils.toString(originalFlowFile)
log.info(text)
if(originalFlowFile != None):
event = json.loads(text)
if (event['true'] == 'Y'):
flowfile = session.putAttribute(flowfile, "true", "Y")
elif (event['src'] == 'ONE' ):
allAttributes = { "true": "N", "src": "ONE" }
flowfile = session.putAllAttributes(flowfile, allAttributes)
elif (event['src'] == 'TWO' ):
allAttributes = { "true": "N", "src": "TWO" }
flowfile = session.putAllAttributes(flowfile, allAttributes)
session.transfer(flowFile, REL_SUCCESS)
session.commit()
This is in python. The flowfile data IS a JSON. Still, I am unable to parse it.
The INFO part is the output of
text = IOUtils.toString(originalFlowFile)
Any help would be greatly appreciated.
p.s. I am unfamiliar with python
Test data
{
"true":"N",
"src":"ONE",
"var1":"value1",
"var2":"value2"
}
UPDATE
My updated code, which still does not work:
import json
import java.io
from org.apache.commons.io import IOUtils
originalFlowFile = session.get()
if(originalFlowFile != None):
inputStream = session.read(originalFlowFile)
text = IOUtils.toString(inputStream)
log.info(text)
event = json.loads(text)
if (event['true'] == 'Y'):
flowfile = session.putAttribute(flowfile, "true", "Y")
elif (event['src'] == 'ONE' ):
allAttributes = { "true": "N", "src": "ONE" }
flowfile = session.putAllAttributes(flowfile, allAttributes)
elif (event['src'] == 'TWO' ):
allAttributes = { "true": "N", "src": "TWO" }
flowfile = session.putAllAttributes(flowfile, allAttributes)
session.transfer(flowFile, REL_SUCCESS)
session.commit()
Upvotes: 0
Views: 3125
Reputation: 18670
Calling IOUtils.toString() on the flow file itself is likely not going to work since the flow file is not an InputStream, or Reader, or something that can be read on its own. I'm actually surprised that line doesn't produce an exception.
There are two ways you could get the content of a flow file...
The first is by getting the InputStream for a flow file from the session:
originalFlowFile = session.get();
inputStream = session.read(originalFlowFile);
text = IOUtils.toString(inputStream);
The second is by using an InputStreamCallback:
flowFile = session.read(flowFile, {inputStream ->
// read the inputStream
} as InputStreamCallback);
Upvotes: 3