Reputation: 1247
Does NiFi have a synchronization mechanism in order to know when something has been finished processing ?
I ingest some data, do some processing and at step N-1 I want to know that all the data has been processed in order to proceed to (final) step N.
[GetFile / 1000 000 lines] ----> [ Proc1 / process step 0 ] -----> [ Proc2 / process step 1 ] .... [ PutSQL / insert into db ] ---> [ Proc to let me know that I've inserted all the data in the table ] ----> [ ProcN / Run aggregates on data for example ]
Upvotes: 2
Views: 1844
Reputation: 198
I may have a suggestion for you to try out. NiFi has a nice API that allows you to start and stop a processor. You can call this API from within NiFi using the InvokeHTTP processor. This allows you to start [ ProcN / Run aggregates on data for example ] and switch if off again after it ran. You have to make sure that this processor does not run continuously. So your processors would be:
GetFile / 1000 000 lines] ----> [ Proc1 / process step 0 ] -----> [ Proc2 / process step 1 ] .... [ PutSQL / insert into db ] ---> [ Proc to let me know that I've inserted all the data in the table ] ----> -----> [ InvokeHTTP to start ProcN / Run Aggregates ] --via API call--> [ ProcN / Run aggregates on data for example ] -----> [ InvokeHTTP to stop ProcN / Run Aggregates ]
We are investigating this approach to sync request - reply messages to a remote party and prevent too many messages in the pipeline.
Upvotes: 0
Reputation: 11931
NiFi does not really have an explicit synchronization feature built in to the framework, but some processors have features that help synchronize activity. I can think of a few possible ways to make your flow work:
Scheduling - you could schedule the GetFile and later aggregate operation using CRON scheduling on processors, assuming that the operations are relatively predictable in duration.
MonitorActivity - the MonitorActivity processor can trigger a flowfile based on inactivity in a queue. You could use this downstream of PutSQL and trigger when inserts have stopped and the aggregates should begin.
MergeContent (Simple) - a MergeContent processor might aggregate the results of PutSQL into a single message that triggers the aggregate operation. You would have to experiment with the properties for bin size and age to get this to work right.
MergeContent (Defragment) - MergeContent has a Defragment strategy designed to correlate fragments of a larger file together. It requires specific attributes to be set on the flowfiles, see the "Reads Attributes" section at the bottom of the docs. The behavior seems close to what you want, but setting those fragment attributes may be difficult.
Upvotes: 3