funnelCONN
funnelCONN

Reputation: 149

Flowfiles stuck in queue (Apache NiFi)

I have following flow:

ListFTP -> RouteOnAttribute -> FetchFTP -> UnpackContent -> ExecuteScript.

Some of files are stuck in queue UnpackContent -> ExecuteScript.

ExecuteScript ate some flowfiles and they just disappeared: failure and success relationships are empty. It just showed some activity in Tasks/Time field. All of them stuck in queue before ExecuteScript. I tried to empty queue, but not all of flowfiles have been deleted from this queue. About 1/3 of them still stuck in queue. I tried to disable all processors and empty queue again but it returns: 0 FlowFiles (0 bytes) were removed from the queue.

When i'm trying to change Connection destionation it returns:

Cannot change destination of Connection because FlowFiles from this Connection are currently held by ExecuteScript[id=d33c9b73-0177-1000-5151-83b7b938de39]

ExecuScript from this answer (uses Python).

So, I can't empty queue because its always return message that there is no any flowfile, and i can't remove connection. This has been going on for several hours.

Connection configuration: enter image description here

Scheduling is set to 0 sec, no penalties for flowfiles, etc.

Is it script problem?

UPDATE

Changed script to:

flowFile = session.get() 
if (flowFile != None):
    # All processing code starts at this indent
    if errorOccurred:
        session.transfer(flowFile, REL_FAILURE)
    else:
        session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end

Same result.

UPDATE v2

I set concurent tasks to 50 and then ran ExecuteScript again and terminated it. I got this error:

enter image description here

UPDATE v3

I created additional ExecuteScript processor with same script and it works fine. But after i stopped this new processor and create new flowfiles, this processor now have same problems: it's just stuck.

Hilarious. Is ExecuteScript for single use?

Upvotes: 0

Views: 2062

Answers (1)

Kevin Sky
Kevin Sky

Reputation: 11

You need to modify Your code in nifi-1.13.2 because NIFI-8080 caused these bugs. Or you just use nifi 1.12.1

changes

JythonScriptEngineConfigurator:

@Override
public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
    // Always compile when first run
    if (engine != null) {
            // Add prefix for import sys and all jython modules
            prefix = "import sys\n"
                    + Arrays.stream(modulePaths).map((modulePath) -> "sys.path.append(" + PyString.encode_UnicodeEscape(modulePath, true) + ")")
                    .collect(Collectors.joining("\n"));
    }
    return null;
}

@Override
public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
    Object returnValue = null;
    if (engine != null) {
        returnValue = ((Compilable) engine).compile(prefix + scriptBody).eval();
    }
    return returnValue;
}

Upvotes: 1

Related Questions