Reputation: 1076
I am currently getting files from FTP in Nifi, but I have to check some conditions before I fetch the file. The scenario goes some thing like this.
List FTP -> Check Condition -> Fetch FTP
In the Check Condition part, I have fetch some values from DB and compare with the file name. So can I use update attribute to fetch some records from DB and make it like this?
List FTP -> Update Attribute (from DB) -> Route on Attribute -> Fetch FTP
Upvotes: 2
Views: 2731
Reputation: 4132
So if I understand your use case correctly, it is like you are using the external DB only for tracking purpose. So I guess only the latest processed timestamp is enough. In that case, I would suggest you to use DistributedCache
processors and ControllerServices offered by NiFi instead of relying on an external DB.
With this method, your flow would be like:
ListFile --> FetchDistributedMapCache --(success)--> RouteOnAttribute -> FetchFile
Configure FetchDistributedMapCache
lastProcessedTime
latestTimestamp
or lastProcessedTime
Configure RouteOnAttribute
Create a new dynamic relationship by clicking the (+) button in the Properties
tab. Give it a name, like success
or matches
. Let's assume, your filenames are of the format somefile_1534824139
i.e. it has a name and an _
and the epoch timestamp appended.
In such case, you can leverage NiFi Expression Language
and make use of the functions it offer. So for the new dynamic relation, you can have an expression like:
${filename:substringAfter('_'):gt(${lastProcessedTimestamp})}
This is with the assumption that, in FetchDistributedMapCache
, you have configured the property Put Cache Value In Attribute
with the value lastProcessedTimestamp
.
Useful Links
Upvotes: 1
Reputation: 31460
I think your flow looks something like below
Flow:
1.ListFTP //to list the files
2.ExecuteSQL //to execute query in db(sample query:select max(timestamp) db_time from table)
3.ConvertAvroToJson //convert the result of executesql to json format
4.EvaluateJsonPath //keep destination as FlowfileAttribute and add new property as db_time as $.db_time
5.ROuteOnAttribute //perform check filename timestamp vs extracted timestamp by using nifi expresson language
6.FetchFile //if condition is true then fetch the file
RouteOnAttribute Configs:
I have assumed filename is something like fn_2017-08-2012:09:10 and executesql has returned 2017-08-2012:08:10
Expression:
${filename:substringAfter('_'):toDate("yyyy-MM-ddHH:mm:ss"):toNumber()
:gt(${db_time:toDate("yyyy-MM-ddHH:mm:ss"):toNumber()})}
By using above expression we are having filename value same as ListFTP
filename and db_time
attribute is added by using EvaluateJsonPath
processor and we are changing the time stamp to number then comparing.
Refer to this link for more details regards to NiFi expression language.
Upvotes: 1