Reputation: 642
i have a flow where i have to wait for exact 12 files each time.
so i have written code like
getHDFS -> notify -> wait -> some processing.
notify has following config
Release Signal Identifier = eredee_45rgfyWQQWQ
Signal Counter Name = cntr_for_run
Signal Counter Delta = 1
Signal Buffer Count = 1
wait has following config
Release Signal Identifier = eredee_45rgfyWQQWQ
Target Signal Count = 12
Signal Counter Name = cntr_for_run
Wait Buffer Count = 1
Releasable FlowFile Count = 1
this works for first time, and it releases files only when total read are 12. but as soon as 13th file comes, it just passes by. I want it to work in such a way next time wait releases file when total count is 24 (i mean next 12 files). how can I reset running counter back to zero?
Upvotes: 2
Views: 2996
Reputation: 18670
I haven't tried this, but you could use the PutDistributedMapCache
processor to set an empty value for the cache key of the signal identifier you use in Wait/Notify.
Alternatively, you could somehow generate a new release signal identifier for each batch of files and use expression language in the Notify and Wait processors to reference a dynamic value instead of hard coding the identifier.
Upvotes: 2
Reputation: 41
To reset the Wait-Notify counter you should use NotifyProcessor
with the property Signal Counter Delta
set to zero.
In your case, it makes sense to reset it at the very beginning of the pipeline.
If you have already put something to cache and started to receive errors from Wait
and Notify
processors, you should disable and then enable the DistributedMapCacheServer
controller service. Be careful because this action resets all values in the cache.
Upvotes: 0
Reputation: 11
I had same problem. And solve is: in property Wait process use
Attribute Copy Mode = Replace if present
And in my case it work only when
Replace if present = total
Upvotes: 1
Reputation: 1668
A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the signal counter delta. Specify how much the counter should increase. For example, if multiple signal events are processed at upstream flow in batch oriented way, the number of events processed can be notified with this property at once. Zero (0) has a special meaning, it clears target count back to 0, which is especially useful when used with Wait Releasable FlowFile Count = Zero (0) mode, to provide 'open-close-gate' type of flow control. One (1) can open a corresponding Wait processor, and Zero (0) can negate it as if closing a gate.
(The description of the Signal Counter Delta
property of the Notify
processor.)
So you can just set counter delta to zero, trigger it, and there you have it.
Also, try using UpdateAttribute
and update a counter every 12 flowfiles.
UpdateAttribute
: Stateful Variables Initial Value=1
Store State=Store State Locally
count=${getStateValue("count"):equals(13):ifElse('1', ${getStateValue("count"):plus(1)}}
=> RouteOnAttribute
:
verify=${count:equals(13)}
Matched
=> Reset using Notify(described above)
Unmatched
=> continue as usual
I do have to warn you though, that it will only work on one node(since the state of UpdateAttribute
is stored locally on the node)
Upvotes: 0