Leibnitz
Leibnitz

Reputation: 355

NiFi: Manually combine multiple flowfiles based on an attribute

NiFi Version - 1.8

Would like to manually merge multiple flowfiles based on an attribute.

I know, this can be easily done using MergeRecordProcessor by specifying the Correlation Attribute Name property but would like to do this using Groovy Script(ExecuteGroovyScript/ExecuteScript).

Flowfile count would be less than 20 so after merging on that specific attribute it should come to less than 8.

Is there an efficient way of doing this manually using Groovy script? Point me to the right direction if I had to do this using script.

Upvotes: 2

Views: 2037

Answers (1)

Andy
Andy

Reputation: 14194

As pointed out, there are standard processors that accomplish this objective, and Max Bin Age is an optional property, so you don't need to set it.

If you insist on doing this manually with a script, you would write a script which acquires multiple flowfiles from the session using def flowfiles = session.get(100) (tune the batch size to ensure you get all the related flowfiles from the incoming queue while not taxing your heap), filter and group them by the specific attribute, then concatenate all the contents to the content of a new flowfile. Update the incoming flowfiles and include all the UUIDs in an attribute of the merged flowfile. Then transfer all of the merged flowfiles to an "original" relationship, and the merged flowfile to a custom "merged" relationship (you'll need to use InvokeScriptedProcessor to provide custom relationships). Any flowfiles which you did not merge will need to be returned to the incoming queue.

This is only a cursory summary. There are more details and edge cases to consider. I strongly recommend against creating a custom scripted processor to do this when robust, tested solutions already exist. If you still want to write your own, look at the existing code for the MergeContent processor to understand how it works. The Apache NiFi Developer's Guide also has a section on design patterns (though it does not cover many-to-one as the standard processors do this already).

Upvotes: 3

Related Questions