Apurba Pandey
Apurba Pandey

Reputation: 1076

Nifi add attribute from DB

I am currently getting files from FTP in Nifi, but I have to check some conditions before I fetch the file. The scenario goes some thing like this.

List FTP -> Check Condition -> Fetch FTP

In the Check Condition part, I have fetch some values from DB and compare with the file name. So can I use update attribute to fetch some records from DB and make it like this?

List FTP -> Update Attribute (from DB) -> Route on Attribute -> Fetch FTP

Upvotes: 2

Views: 2731

Answers (2)

Sivaprasanna Sethuraman
Sivaprasanna Sethuraman

Reputation: 4132

So if I understand your use case correctly, it is like you are using the external DB only for tracking purpose. So I guess only the latest processed timestamp is enough. In that case, I would suggest you to use DistributedCache processors and ControllerServices offered by NiFi instead of relying on an external DB.

With this method, your flow would be like:

ListFile --> FetchDistributedMapCache --(success)--> RouteOnAttribute -> FetchFile

Configure FetchDistributedMapCache

  • Cache Entry Identifier - This is the key for your Cache. Set it to something like lastProcessedTime
  • Put Cache Value In Attribute - Whatever name you give here will be added as a FlowFile attribute with its value being the Cache value. Provide a name, like latestTimestamp or lastProcessedTime

Configure RouteOnAttribute

Create a new dynamic relationship by clicking the (+) button in the Properties tab. Give it a name, like success or matches. Let's assume, your filenames are of the format somefile_1534824139 i.e. it has a name and an _ and the epoch timestamp appended.

In such case, you can leverage NiFi Expression Language and make use of the functions it offer. So for the new dynamic relation, you can have an expression like:

  • success - ${filename:substringAfter('_'):gt(${lastProcessedTimestamp})}

This is with the assumption that, in FetchDistributedMapCache, you have configured the property Put Cache Value In Attribute with the value lastProcessedTimestamp.

Useful Links

Upvotes: 1

notNull
notNull

Reputation: 31460

I think your flow looks something like below

Flow:

1.ListFTP //to list the files
2.ExecuteSQL //to execute query in db(sample query:select max(timestamp) db_time from table)
3.ConvertAvroToJson //convert the result of executesql to json format
4.EvaluateJsonPath //keep destination as FlowfileAttribute and add new property as db_time as $.db_time
5.ROuteOnAttribute //perform check filename timestamp vs extracted timestamp by using nifi expresson language
6.FetchFile //if condition is true then fetch the file

enter image description here

RouteOnAttribute Configs:

I have assumed filename is something like fn_2017-08-2012:09:10 and executesql has returned 2017-08-2012:08:10

Expression:

${filename:substringAfter('_'):toDate("yyyy-MM-ddHH:mm:ss"):toNumber()
:gt(${db_time:toDate("yyyy-MM-ddHH:mm:ss"):toNumber()})}

By using above expression we are having filename value same as ListFTP filename and db_time attribute is added by using EvaluateJsonPath processor and we are changing the time stamp to number then comparing.

Refer to this link for more details regards to NiFi expression language.

enter image description here

Upvotes: 1

Related Questions