wederer
wederer

Reputation: 570

How to execute a processor only when another processor is not executing?

I am inserting/updating data into a table. The database system does not provide an "Upsert" functionality. Thus I am using a staging table for the insert followed by a merge into the "final" table and finally I am truncating the staging table.

This leads to a race condition. If new data is inserted into the staging table between the merge+truncate this data is lost.

How can I make sure this does not happen?

I have tried to model this via Wait/Notify, but this is not a clean solution either. The queue for the "Put Data into staging table" PutDatabaseRecord processor could be filled and "MergeVertica for Insert/Update" ExecuteSQL could still execute.

Nifi Flow

Upvotes: 0

Views: 859

Answers (1)

Up_One
Up_One

Reputation: 5271

I would use a MonitorActivity processor with a 60 or 30 sec threshold and use the Inactive output with a Continually Send Messages set to "false".

Have the success of the SQL inserts into staging connection into your MonitorActivity, this way if no activity is seen in the last X seconds he will trigger a flowfile that will start your Merge Process.

Download the template from https://codeshare.io/aJNNkn

enter image description here

Upvotes: 1

Related Questions