Reputation: 603
I have a following flow, ListFile ---> FetchFile ---> ? ExecuteScript (maybe) ---> Notify
Basically, I want to go to Notify, if
I think the 1st condition is easy to achieve. I can have a groovy script which can read number of flowfiles, if 200 go to SUCCESS or else ROLLBACK the session.
But I want to know how to also check the time elapsed for n (number can be less than 200) flowfiles in queue is more than 3 hours or so?
Update Here is the problem: We have a batch processing (~200 files and can increase based on business in future) currently. We have a NiFi pipeline, i.e. List, Fetch, Basic validation on checksum, etc and process (call the SQL) which is working fine. As per the business, throughout the day we can have the correction to data so that we can get all or some of the files to "re-process". That is also fine and working.
Now, as per new requirements, we need to build the process after this "batch" is completed. So in the best case, I can have the MergeContent processor with max bin of n and give the signal or notify to my new processor. However, as explained above, throughout that day we can get few or all files processed again. So now my "n" may not match the new "number" of files re-processed. Hence, even in this case if we have elapsed say 3 hours, then irrespective of "n" not equal to new number of files reprocessed, I should notify the new process to run again. Hence, I am looking for n files OR m hours elapsed check.
Upvotes: 0
Views: 2191
Reputation: 14184
I think this may be an example of an XY problem -- you're trying to solve a problem and believe that counting the number of files fetched or time elapsed will help, but this pattern is usually discouraged in Apache NiFi and there are other solutions to the original problem. I would encourage you to describe more fully the higher level problem you are trying to solve to see if there is a better solution.
I will answer the question though (none of these are ideal solutions).
MergeContent
processor with a minimum bin count of 200ExecuteScript
processor as you notedDistributedCacheMapServer
when the Notify
processor executes, and check that value with a FetchDistributedCacheMap
processor against the current timestamp and use a simple Expression Language statement to compare the timestamp valuesI think you may also want to read some examples of Wait
/Notify
logic, because creating thresholds like "200 incoming flowfiles || 3 hours elapsed time" is what the Wait
processor does.
Upvotes: 1