Reputation: 400
I have flow in NiFi:
Download file -> UnpackContent -> PutHDFS
After all flowfiles put in HDFS, I need run shell script.
How I can know or check that all flow files have been processed?
Upvotes: 5
Views: 4254
Reputation: 1659
@Антон Букреев
You should be able to inspect the flowfile attributes for fragment.index and fragment.count.
This values are used to indicate which part in the unpacked series each unpacked flowfile is. This is how you know they are done processing in HDFS. You would need to execute a MergeContent Or Wait/Notify process based on the count before using ExecuteScript. You can also access these in ExecuteScript if you need to return metadata about the part(s) locations in HDFS. I recommend the later as you likely needed to unpack the results for good reason in the final step of the flow.
I have created an UnpackContent Demo template for you which you can find on my GitHub:
https://github.com/steven-dfheinz/NiFi-Templates/blob/master/UnpackContent_Demo.xml
Upvotes: 2
Reputation: 45722
Nifi UnpackContent processor writes fragment.identifier
and fragment.count
attributes. This attributes may be automatically handled by Nifi MergeContent processor with Merge Strategy = Defragment
. So you may have the following flow:
UnpackContent -> PutHDFS -> AttributesToJSON -> MergeContent -> ...
AtributesToJSON
is needed to drop Flow File content and don't have performance penalty on MergeContent
.
MergeContent
will automatically merge all Flow Files related to single archive.
Alternatively, you may implement your own logic based on fragment.identifier
& fragment.count
attributes with Wait
/Notify
processors. But I suppose MergeContent
option would be easier for your case.
Upvotes: 1
Reputation: 159
There is no good way to do that because NiFi is a stream processing product - it continually reading files from one or more points, processing them and sending the other point.
So there is no such thing as finishing process all the files in NiFi because the flow continues to run and wait for new files.
What you can do is query the provenance repository and look at a specific flowfile and check if it finished the flow.
So what I suggest to do is:
If you know how many files you expected to process: Query the provenance repository for how many files finished the flow.
If you don't: Query the last time a new file was written to HDFS and if more the X seconds passed, run the script.
Upvotes: 2
Reputation: 36
NiFi provides provenance events log repository which includes information about all the actions that happened on every flowfile within your cluster.
Read more about the provenance logs and how to use it here: https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#provenance-repository
Upvotes: 1