Tomas Lukac
Tomas Lukac

Reputation: 2255

Apache NiFi Wait select FlowFile by attribute

I am creating a flow for processing some data from multiple sources (same platform, different customer). Each FlowFile is generated by triggering the HandleHttpRequest processor. I can only process one file at a time for certain customer. This process is also asynchronous (I am looping while I don't receive the response from the API that the process was finished).

What I have right now is a Wait/Notify flow, so after one FlowFile gets processed, Wait will release another file to process. However, this will only work for one customer. What I want is to have a dynamic number of Wait processors or one Wait processor, that can release FlowFiles conditionally (by attribute).

Example:

I have customer A and B. Each has generated FlowFiles with attribute

customer: ${cust_name}

These FlowFiles has been stopped in Wait processor and waiting for the notification by the Notify processor. The order of these files is unknown (order of files for one customer is always sorted). This means, that the queue can look like this (A3 B3 A2 A1 B2 B1). What I want is to Notify the Wait processor to release next A element or B element by attribute.

Is something like this possible ?

Upvotes: 0

Views: 1495

Answers (1)

Tomas Lukac
Tomas Lukac

Reputation: 2255

I found the solution to what I wanted to achieve ! So I have a Wait processor accepting files with an attribute customer, which has either value of A or B.

The files are then flowing in a loop in the Wait processor into wait relationship. enter image description here

What happens is, that the order of these files entering wait queue is always the same. The Wait processor always look up for the first entry in the queue ant that's it.

To achieve the perpetual cycling of FlowFiles, you need to configure the wait queue with FirstInFirstOutPrioritizer.

However, this will not guarantee that Wait processor will release the oldest FlowFile, because the wait queue is always changing.

But there is a solution for this. There is a Wait Penalty Duration attribute, which will skip the first file in the queue if it did not match the signal, then second, third ... until the desired oldest file was found (or penalty will expire). You can find the whole conversation here https://github.com/apache/nifi/pull/3540

enter image description here

It works with Run schedule set to 0 and wait queue at default settings.

Upvotes: 0

Related Questions