Reputation: 149
I have following flow:
ListFTP -> RouteOnAttribute -> FetchFTP -> UnpackContent -> ExecuteScript
.
Some of files are stuck in queue UnpackContent -> ExecuteScript
.
ExecuteScript
ate some flowfiles and they just disappeared: failure
and success
relationships are empty. It just showed some activity in Tasks/Time
field. All of them stuck in queue before ExecuteScript
. I tried to empty queue, but not all of flowfiles have been deleted from this queue. About 1/3 of them still stuck in queue. I tried to disable all processors and empty queue again but it returns: 0 FlowFiles (0 bytes) were removed from the queue.
When i'm trying to change Connection destionation it returns:
Cannot change destination of Connection because FlowFiles from this Connection are currently held by ExecuteScript[id=d33c9b73-0177-1000-5151-83b7b938de39]
ExecuScript
from this answer (uses Python).
So, I can't empty queue because its always return message that there is no any flowfile, and i can't remove connection. This has been going on for several hours.
Scheduling is set to 0 sec, no penalties for flowfiles, etc.
Is it script problem?
UPDATE
Changed script to:
flowFile = session.get()
if (flowFile != None):
# All processing code starts at this indent
if errorOccurred:
session.transfer(flowFile, REL_FAILURE)
else:
session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end
Same result.
UPDATE v2
I set concurent tasks to 50 and then ran ExecuteScript
again and terminated it. I got this error:
UPDATE v3
I created additional ExecuteScript processor with same script and it works fine. But after i stopped this new processor and create new flowfiles, this processor now have same problems: it's just stuck.
Hilarious. Is ExecuteScript
for single use?
Upvotes: 0
Views: 2062
Reputation: 11
You need to modify Your code in nifi-1.13.2 because NIFI-8080 caused these bugs. Or you just use nifi 1.12.1
JythonScriptEngineConfigurator:
@Override
public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
// Always compile when first run
if (engine != null) {
// Add prefix for import sys and all jython modules
prefix = "import sys\n"
+ Arrays.stream(modulePaths).map((modulePath) -> "sys.path.append(" + PyString.encode_UnicodeEscape(modulePath, true) + ")")
.collect(Collectors.joining("\n"));
}
return null;
}
@Override
public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
Object returnValue = null;
if (engine != null) {
returnValue = ((Compilable) engine).compile(prefix + scriptBody).eval();
}
return returnValue;
}
Upvotes: 1