Reputation: 841
I'm evaluating Nifi for our ETL process. I want to build the following flow: Fetch a lot of data from SQL database -> Split into chunks 1000 records each -> Count error records in each chunk -> Count total number of error records -> If it exceeds a threshold Fail process -> else save each chunk to the database.
The problem I can't resolve is how to wait until all chunks are validated. If for example I have 5 validation tasks working concurrently, I need some kind of barrier to wait until all chunks are processed and only after that run error count processor because I don't want to save invalid data and delete it if the threshold is reached.
The other question I have is if there is any possibility to run this validation processor on multiple nodes in parallel and still have the possibility to wait until they all are completed.
Upvotes: 4
Views: 1449
Reputation: 478
Updating count in distributed MapCache is not the correct way as fetch and update are separate and cannot be made in atomic processor which just increments counts.
Upvotes: 1
Reputation: 14184
One solution to this is to use the ExecuteScript
processor as a "relief valve" to hold a simple count in memory triggered off of the first receipt of a flowfile with a specific attribute value (store in the local/cluster state with basically a Map of key attribute-value
to value count
). Once that value reaches a threshold, you can generate a new flowfile to route to the success relationship containing the attribute value that has finished. In this case, send the other results (the flowfiles that need to be batched) to a MergeContent
processor and set the minimum batching size to whatever you like. The follow-on processor to the valve should have its Scheduling Strategy set to Event Driven
so it only runs when it receives a flowfile from the valve.
Upvotes: 2