Rakesh Prasad
Rakesh Prasad

Reputation: 642

how can i reset wait notify counter in nifi

i have a flow where i have to wait for exact 12 files each time.

so i have written code like

getHDFS -> notify -> wait -> some processing.

notify has following config

Release Signal Identifier = eredee_45rgfyWQQWQ
Signal Counter Name = cntr_for_run
Signal Counter Delta = 1
Signal Buffer Count = 1

wait has following config

Release Signal Identifier = eredee_45rgfyWQQWQ
Target Signal Count = 12
Signal Counter Name = cntr_for_run
Wait Buffer Count = 1
Releasable FlowFile Count = 1

this works for first time, and it releases files only when total read are 12. but as soon as 13th file comes, it just passes by. I want it to work in such a way next time wait releases file when total count is 24 (i mean next 12 files). how can I reset running counter back to zero?

Upvotes: 2

Views: 2996

Answers (4)

Bryan Bende
Bryan Bende

Reputation: 18670

I haven't tried this, but you could use the PutDistributedMapCache processor to set an empty value for the cache key of the signal identifier you use in Wait/Notify.

Alternatively, you could somehow generate a new release signal identifier for each batch of files and use expression language in the Notify and Wait processors to reference a dynamic value instead of hard coding the identifier.

Upvotes: 2

Petr Kireev
Petr Kireev

Reputation: 41

To reset the Wait-Notify counter you should use NotifyProcessor with the property Signal Counter Delta set to zero. In your case, it makes sense to reset it at the very beginning of the pipeline.

If you have already put something to cache and started to receive errors from Wait and Notify processors, you should disable and then enable the DistributedMapCacheServer controller service. Be careful because this action resets all values in the cache.

Upvotes: 0

Dmytro Tsylyuryk
Dmytro Tsylyuryk

Reputation: 11

I had same problem. And solve is: in property Wait process use

Attribute Copy Mode = Replace if present

And in my case it work only when

Replace if present = total

Upvotes: 1

Ben Yaakobi
Ben Yaakobi

Reputation: 1668

A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the signal counter delta. Specify how much the counter should increase. For example, if multiple signal events are processed at upstream flow in batch oriented way, the number of events processed can be notified with this property at once. Zero (0) has a special meaning, it clears target count back to 0, which is especially useful when used with Wait Releasable FlowFile Count = Zero (0) mode, to provide 'open-close-gate' type of flow control. One (1) can open a corresponding Wait processor, and Zero (0) can negate it as if closing a gate.

(The description of the Signal Counter Delta property of the Notify processor.) So you can just set counter delta to zero, trigger it, and there you have it.

Also, try using UpdateAttribute and update a counter every 12 flowfiles.

UpdateAttribute: Stateful Variables Initial Value=1 Store State=Store State Locally count=${getStateValue("count"):equals(13):ifElse('1', ${getStateValue("count"):plus(1)}}

=> RouteOnAttribute: verify=${count:equals(13)}

Matched => Reset using Notify(described above)

Unmatched => continue as usual

I do have to warn you though, that it will only work on one node(since the state of UpdateAttribute is stored locally on the node)

Upvotes: 0

Related Questions