riccamini
riccamini

Reputation: 1201

Apache NiFi - NullPointerException setting more than one thread on custom processors

I am currently working on a NiFi flow that requires the implementation of custom processors to apply transformations on csv records.

I've noticed this behavior during some benchmarks I am performing: if only one thread is assigned to each custom processor everything works well. Assigning more threads to the custom processors results to a failed to process session due to java.lang.NullPointerException.

Since the error cannot be reproduced with a single thread, I am thinking more about some issues in handling the flowfiles, or some aspect of NiFi I am not aware of.

Processing is performed accessing to the flowfile attributes. The flowfile content is never read, and the output flowfile is returned after adding some attributes. The following is a snippet of the relevant parts of the processor code:

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

    //  Will hold all the processed attributes
    Map<String, String> processedAttributes = new HashMap<>();
    FlowFile flowfile = session.get();
    ...
    //  Adds the attributes to the flowfile
    flowfile = session.putAllAttributes(flowfile, processedAttributes);
    session.transfer(flowfile, PROCESSED);
    }

I am running NiFi 0.7 on a m4.4xlarge Amazon ec2 instance. Since I am seeking high performances (who doesn't) I am looking for a safe way to increase the number of threads. Any suggestion is really appreciated.

Thank you in advance.

Upvotes: 1

Views: 5648

Answers (1)

mattyb
mattyb

Reputation: 12093

It is possible (especially with multiple threads / concurrent tasks) that the session.get() method will return null for some threads. This happens if two or more threads are scheduled because there is a flow file available, then one thread gets the flow file (via session.get()) and the next thread will get null.

In your case you may not be reading or writing to the flow file, but other methods (like session.putAllAttributes()) call methods on the flow file. Many processors add a check to see if flowfile == null and will return (since it wants a flow file to operate on), perhaps that would fix your issue as well.

Upvotes: 3

Related Questions