Cribber
Cribber

Reputation: 2913

Get last execution timestamp of a Nifi PutSQL processor

Is there a way to get the timestamp of the last execution of a PutSQL processor via the REST-API? Does such a timestamp even exist or can I build one myself somehow?

Setup: I have Airflow to trigger my Nifi-ETL which ends with a couple of PutSQL processors - after those are done I need to execute something else in Airflow.

Idea: I want to trigger the first Nifi processor and then wait in Airflow until the last_execution_timestamp of the last PutSQL processor is updated.

Problem: I tried accessing the attribute statsLastRefreshed, but it is not the last execution time, but the last time anything (users / api-requests) accessed the processor which led Nifi to refresh the processor.

s = processor["status"]["statsLastRefreshed"]  # '13:13:26 CEST'

I can't find anything in the REST API documentation of Airflow.

The only other option I see is to make requests from Airflow to the datebase table of the last PutSQL processor to see if anything new happened there.

Upvotes: 1

Views: 1019

Answers (3)

Fuzolan
Fuzolan

Reputation: 1

You can use the updateProcessor

  • set inital value with value = ${now()}
  • set Store state locally
  • set one property with lastExecution with value = ${getStateValue("currentExecution")}
  • set one property with currentExecution = ${now()}
  • be happy :-)

Upvotes: 0

Cribber
Cribber

Reputation: 2913

I figured out a work-around solution.

  1. In a processor add a custom property named mypropertyname with the value ${now()}

  2. Any flowfile that passes through the processor will have the timestamp of when it passed through the processor as an attribute!

  3. Have a UpdateAttribute processor after the processor from step 1 with the option (under processor properties) Store State set to Store state locally.

  4. Add a custom property in the UpdateAttribute processor with the name readable_property and set it to the value ${'mypropertyname'}.

The state of the processor now contains the value of the last flowfile (e.g. with a timestamp of the execution of the now() method from step 1).

  1. Get the value of the stateful processor (and hence the value of the last flowfile that passed through (!) ) via the REST-API and a GET on the URI /nifi-api/processors/{id}/state (e.g. in Airflow)

The JSON which gets returned contains the following lines:

{
"key":"readable_property"
,"value":"Wed Apr 14 11:13:40 CEST 2021"
,"clusterNodeId":"some-id-0d8eb6052"
,"clusterNodeAddress":"some-host:port-number"
}

Then you just have to parse the JSON for the value in Airflow.

Note: There will be a slight delay between the previous processor adding the attribute to the flow-file with now and when the flowfile actually passes through the UpdateAttributeprocessor from where you can read the timestamp.

Upvotes: 3

yaprak
yaprak

Reputation: 547

Nifi is made for continuous data flows. So there is no last execution time in Processor stats.

Although Nifi is capable of ETL (not full-fledged) , I believe Airflow is more suitable for ETL processes than Nifi if your flows require only file and database accesses

Integrating Airflow and Nifi for ETL tasks is a little bit complex architecture. If possible, you may consider to choose one of them to not encounter the problems like one you described here.

If your flows include lots of diverse inputs, complex logics and ingest large files, choosing Airflow only wouldn't be easy.

Upvotes: 0

Related Questions