Ris90
Ris90

Reputation: 841

Apache Nifi processor that acts like a barrier to synchronize multiple flow files

I'm evaluating Nifi for our ETL process. I want to build the following flow: Fetch a lot of data from SQL database -> Split into chunks 1000 records each -> Count error records in each chunk -> Count total number of error records -> If it exceeds a threshold Fail process -> else save each chunk to the database.

The problem I can't resolve is how to wait until all chunks are validated. If for example I have 5 validation tasks working concurrently, I need some kind of barrier to wait until all chunks are processed and only after that run error count processor because I don't want to save invalid data and delete it if the threshold is reached.

The other question I have is if there is any possibility to run this validation processor on multiple nodes in parallel and still have the possibility to wait until they all are completed.

Upvotes: 4

Views: 1449

Answers (2)

donald
donald

Reputation: 478

Updating count in distributed MapCache is not the correct way as fetch and update are separate and cannot be made in atomic processor which just increments counts.

http://apache-nifi-users-list.2361937.n4.nabble.com/How-do-I-atomically-increment-a-variable-in-NiFi-td1084.html

Upvotes: 1

Andy
Andy

Reputation: 14184

One solution to this is to use the ExecuteScript processor as a "relief valve" to hold a simple count in memory triggered off of the first receipt of a flowfile with a specific attribute value (store in the local/cluster state with basically a Map of key attribute-value to value count). Once that value reaches a threshold, you can generate a new flowfile to route to the success relationship containing the attribute value that has finished. In this case, send the other results (the flowfiles that need to be batched) to a MergeContent processor and set the minimum batching size to whatever you like. The follow-on processor to the valve should have its Scheduling Strategy set to Event Driven so it only runs when it receives a flowfile from the valve.

Upvotes: 2

Related Questions