Apache Nifi: How I can know or check that all flow files have been processed

I have flow in NiFi:

Download file -> UnpackContent -> PutHDFS

After all flowfiles put in HDFS, I need run shell script.

How I can know or check that all flow files have been processed?

Upvotes: 5

Views: 4254

Answers (4)

steven-matison
steven-matison

Reputation: 1659

@Антон Букреев

You should be able to inspect the flowfile attributes for fragment.index and fragment.count.

enter image description here

This values are used to indicate which part in the unpacked series each unpacked flowfile is. This is how you know they are done processing in HDFS. You would need to execute a MergeContent Or Wait/Notify process based on the count before using ExecuteScript. You can also access these in ExecuteScript if you need to return metadata about the part(s) locations in HDFS. I recommend the later as you likely needed to unpack the results for good reason in the final step of the flow.

I have created an UnpackContent Demo template for you which you can find on my GitHub:

https://github.com/steven-dfheinz/NiFi-Templates/blob/master/UnpackContent_Demo.xml

Upvotes: 2

VB_
VB_

Reputation: 45722

Nifi UnpackContent processor writes fragment.identifier and fragment.count attributes. This attributes may be automatically handled by Nifi MergeContent processor with Merge Strategy = Defragment. So you may have the following flow:

UnpackContent -> PutHDFS -> AttributesToJSON -> MergeContent -> ... 

AtributesToJSON is needed to drop Flow File content and don't have performance penalty on MergeContent.enter image description here

MergeContent will automatically merge all Flow Files related to single archive.enter image description here

Alternatively, you may implement your own logic based on fragment.identifier & fragment.count attributes with Wait/Notify processors. But I suppose MergeContent option would be easier for your case.

Upvotes: 1

Bigicecream
Bigicecream

Reputation: 159

There is no good way to do that because NiFi is a stream processing product - it continually reading files from one or more points, processing them and sending the other point.

So there is no such thing as finishing process all the files in NiFi because the flow continues to run and wait for new files.

What you can do is query the provenance repository and look at a specific flowfile and check if it finished the flow.

So what I suggest to do is:

If you know how many files you expected to process: Query the provenance repository for how many files finished the flow.

If you don't: Query the last time a new file was written to HDFS and if more the X seconds passed, run the script.

Upvotes: 2

mrliberman
mrliberman

Reputation: 36

NiFi provides provenance events log repository which includes information about all the actions that happened on every flowfile within your cluster.

Read more about the provenance logs and how to use it here: https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#provenance-repository

Upvotes: 1

Related Questions