spoof
spoof

Reputation: 65

How to extract and manipulate data within a Nifi processor

I'm trying to write a custom Nifi processor which will take in the contents of the incoming flow file, perform some math operations on it, then write the results into an outgoing flow file. Is there a way to dump the contents of the incoming flow file into a string or something? I've been searching for a while now and it doesn't seem that simple. If anyone could point me toward a good tutorial that deals with doing something like that it would be greatly appreciated.

Upvotes: 0

Views: 2475

Answers (1)

Andy
Andy

Reputation: 14194

The Apache NiFi Developer Guide documents the process of creating a custom processor very well. In your specific case, I would start with the Component Lifecycle section and the Enrich/Modify Content pattern. Any other processor which does similar work (like ReplaceText or Base64EncodeContent) would be good examples to learn from; all of the source code is available on GitHub.

Essentially you need to implement the #onTrigger() method in your processor class, read the flowfile content and parse it into your expected format, perform your operations, and then re-populate the resulting flowfile content. Your source code will look something like this:

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        final ComponentLog logger = getLogger();
        AtomicBoolean error = new AtomicBoolean();
        AtomicReference<String> result = new AtomicReference<>(null);

        // This uses a lambda function in place of a callback for InputStreamCallback#process()
        processSession.read(flowFile, in -> {
            long start = System.nanoTime();

            // Read the flowfile content into a String
            // TODO: May need to buffer this if the content is large
            try {
                final String contents = IOUtils.toString(in, StandardCharsets.UTF_8);
                result.set(new MyMathOperationService().performSomeOperation(contents));

                long stop = System.nanoTime();
                if (getLogger().isDebugEnabled()) {
                    final long durationNanos = stop - start;
                    DecimalFormat df = new DecimalFormat("#.###");
                    getLogger().debug("Performed operation in " + durationNanos + " nanoseconds (" + df.format(durationNanos / 1_000_000_000.0) + " seconds).");
                }
            } catch (Exception e) {
                error.set(true);
                getLogger().error(e.getMessage() + " Routing to failure.", e);
            }
        });

        if (error.get()) {
            processSession.transfer(flowFile, REL_FAILURE);
        } else {
            // Again, a lambda takes the place of the OutputStreamCallback#process()
            FlowFile updatedFlowFile = session.write(flowFile, (in, out) -> {
                final String resultString = result.get();
                final byte[] resultBytes = resultString.getBytes(StandardCharsets.UTF_8);

                // TODO: This can use a while loop for performance
                out.write(resultBytes, 0, resultBytes.length);
                out.flush();
            });
            processSession.transfer(updatedFlowFile, REL_SUCCESS);
        }
    }

Daggett is right that the ExecuteScript processor is a good place to start because it will shorten the development lifecycle (no building NARs, deploying, and restarting NiFi to use it) and when you have the correct behavior, you can easily copy/paste into the generated skeleton and deploy it once.

Upvotes: 1

Related Questions