Cribber
Cribber

Reputation: 2913

Read flow file attribute/content to processor property

I want to set a property of a processor based on the contents of the last flowfile that came through.

Example: I instantiate the flowfile with the processor GenerateFlowFile and with the custom text ${now()} as the current timestamp during the creation of the flowFile.

I want to have a processor (which kind is irrelevant to me) to read the content of the flowfile (the timestamp) to the processor's custom property property_name. Afterwards I want to be able to potentially query the processor via the REST-API and read that property from the processor.

Initially I thought I could do that with the ExtractText processor, but it extracts text based on regex and writes it back to the flowfile, while I want to save that information in the processor until the next flowfile arrives.

Upvotes: 0

Views: 3555

Answers (3)

Cribber
Cribber

Reputation: 2913

Thanks to @Ivan I was able to create a full working solution - for future reference:

  1. Instantiate flowfiles with e.g. a GenerateFlowFile processor and add a custom property "myproperty" and value ${now()} (note: you can add this property to the flow files in any processor, doesn't have to be a GenerateFlowFile processor)

  2. Have a UpdateAttribute processor with the option (under processor properties) Store State set to Store state locally.

  3. Add a custom property in the UpdateAttribute processor with the name readable_property and set it to the value ${'myproperty'}.

The state of the processor now contains the value of the last flowfile (e.g. with a timestamp of when the attribute was added to the flowfile).

Added Bonus:

  1. Get the value of the stateful processor (and hence the value of the last flowfile that passed through (!) ) via the REST-API and a GET on the URI /nifi-api/processors/{id}/state

The JSON which gets returned contains the following lines:

{
"key":"readable_property"
,"value":"Wed Apr 14 11:13:40 CEST 2021"
,"clusterNodeId":"some-id-0d8eb6052"
,"clusterNodeAddress":"some-host:port-number"
}

Then you just have to parse the JSON for the value.

Upvotes: 0

Ivan Klimenko
Ivan Klimenko

Reputation: 26

You can't do it via NiFi. When the processor running you can't update its config.

Maybe you can use state variables on UpdateAttribute?

Stateful Usage

By selecting "store state locally" option for the "Store State" property UpdateAttribute will not only store the evaluated properties as attributes of the FlowFile but also as stateful variables to be referenced in a recursive fashion. This enables the processor to calculate things like the sum or count of incoming FlowFiles. A dynamic property can be referenced as a stateful variable like so:

Dynamic Property key : theCount value : ${getStateValue("theCount"):plus(1)} This example will keep a count of the total number of FlowFiles that have passed through the processor. To use logic on top of State, simply use the "Advanced Usage" of UpdateAttribute. All Actions will be stored as stateful attributes as well as being added to FlowFiles. Using the "Advanced Usage" it is possible to keep track of things like a maximum value of the flow so far. This would be done by having a condition of "${getStateValue("maxValue"):lt(${value})}" and an action of attribute:"maxValue", value:"${value}". The "Stateful Variables Initial Value" property is used to initialize the stateful variables and is required to be set if running statefully. Some logic rules will require a very high initial value, like using the Advanced rules to determine the minimum value. If stateful properties reference other stateful properties then the value for the other stateful properties will be an iteration behind. For example, attempting to calculate the average of the incoming stream requires the sum and count. If all three properties are set in the same UpdateAttribute (like below) then the Average will always not include the most recent values of count and sum:

Count key : theCount value : ${getStateValue("theCount"):plus(1)} Sum> key : theSum value : ${getStateValue("theSum"):plus(${flowfileValue})} Average key : theAverage value : ${getStateValue("theSum"):divide(getStateValue("theCount"))} Instead, since average only relies on theCount and theSum attributes (which are added to the FlowFile as well) there should be a following Stateless UpdateAttribute which properly calculates the average. In the event that the processor is unable to get the state at the beginning of the onTrigger, the FlowFile will be pushed back to the originating relationship and the processor will yield. If the processor is able to get the state at the beginning of the onTrigger but unable to set the state after adding attributes to the FlowFile, the FlowFile will be transferred to "set state fail". This is normally due to the state not being the most up to date version (another thread has replaced the state with another version). In most use-cases this relationship should loop back to the processor since the only affected attributes will be overwritten. Note: Currently the only "stateful" option is to store state locally. This is done because the current implementation of clustered state relies on Zookeeper and Zookeeper isn't designed for the type of load/throughput UpdateAttribute with state would demand. In the future, if/when multiple different clustered state options are added, UpdateAttribute will be updated.

Upvotes: 1

Ivan Klimenko
Ivan Klimenko

Reputation: 26

You should use UpdateAttribute processor. You can read several methods - f.e. Update attributes based on content in NiFi

Upvotes: -1

Related Questions