Reputation: 2913
Is there a way to get the timestamp of the last execution of a PutSQL
processor via the REST-API? Does such a timestamp even exist or can I build one myself somehow?
Setup: I have Airflow to trigger my Nifi-ETL which ends with a couple of PutSQL processors - after those are done I need to execute something else in Airflow.
Idea: I want to trigger the first Nifi processor and then wait in Airflow until the last_execution_timestamp
of the last PutSQL processor is updated.
Problem:
I tried accessing the attribute statsLastRefreshed
, but it is not the last execution time, but the last time anything (users / api-requests) accessed the processor which led Nifi to refresh the processor.
s = processor["status"]["statsLastRefreshed"] # '13:13:26 CEST'
I can't find anything in the REST API documentation of Airflow.
The only other option I see is to make requests from Airflow to the datebase table of the last PutSQL processor to see if anything new happened there.
Upvotes: 1
Views: 1019
Reputation: 1
You can use the updateProcessor
Upvotes: 0
Reputation: 2913
I figured out a work-around solution.
In a processor add a custom property named mypropertyname with the value ${now()}
Any flowfile that passes through the processor will have the timestamp of when it passed through the processor as an attribute!
Have a UpdateAttribute
processor after the processor from step 1 with the option (under processor properties) Store State set to Store state locally
.
Add a custom property in the UpdateAttribute
processor with the name readable_property and set it to the value ${'mypropertyname'}
.
The state of the processor now contains the value of the last flowfile (e.g. with a timestamp of the execution of the now()
method from step 1).
/nifi-api/processors/{id}/state
(e.g. in Airflow)The JSON which gets returned contains the following lines:
{
"key":"readable_property"
,"value":"Wed Apr 14 11:13:40 CEST 2021"
,"clusterNodeId":"some-id-0d8eb6052"
,"clusterNodeAddress":"some-host:port-number"
}
Then you just have to parse the JSON for the value in Airflow.
Note: There will be a slight delay between the previous processor adding the attribute to the flow-file with now
and when the flowfile actually passes through the UpdateAttribute
processor from where you can read the timestamp.
Upvotes: 3
Reputation: 547
Nifi is made for continuous data flows. So there is no last execution time in Processor
stats.
Although Nifi is capable of ETL (not full-fledged) , I believe Airflow is more suitable for ETL processes than Nifi if your flows require only file and database accesses
Integrating Airflow and Nifi for ETL tasks is a little bit complex architecture. If possible, you may consider to choose one of them to not encounter the problems like one you described here.
If your flows include lots of diverse inputs, complex logics and ingest large files, choosing Airflow only wouldn't be easy.
Upvotes: 0