funnelCONN
funnelCONN

Reputation: 149

Improving flow in Apache NiFI

I'm trying to simplify flow in Apache NiFi.

What I want:

  1. Call Facebook Graph API to receive campaigns for ad accounts and save it to DB. Response example:
[ {
  "start_date" : "2018-10-15",
  "stop_date" : "2019-03-31",
  "id" : "608962192",
  "account_id" : "1007311",
  "name" : "Axe_Instagram_aug-dec2018_col",
  "status" : "ACTIVE",
  "start_time" : "2018-10-15",
  "stop_time" : "2019-03-31"
}, {
  "start_date" : "2018-10-08",
  "stop_date" : "2018-10-31",
  "id" : "61084542",
  "account_id" : "10240051",
  "name" : "Axe_IG_aug-dec2018",
  "status" : "ACTIVE",
  "start_time" : "2018-10-08",
  "stop_time" : "2018-10-31"
} ]
  1. Call Facebook Graph API to receive ads for ad accounts and save it to DB. Response example:
[
   {
      "id":"23845",
      "account_id":"251977841",
      "name":"Post_2",
      "status":"ACTIVE",
      "campaign_id":"2384345125",
      "adset_id":"238125",
      "bid_amount":87,
      "updated_time":"2019-06-20T14:21:06+0300"
   },
   {
      "id":"23843453786320125",
      "account_id":"2251971478158841",
      "name":"Post_1",
      "status":"ACTIVE",
      "campaign_id":"238225",
      "adset_id":"2384325",
      "bid_amount":87,
      "updated_time":"2019-06-20T14:21:06+0300"
   }
]
  1. Filter ads:
    • I should leave only active campaigns (from campaigns) using these rules: stop_date should be empty (NULL) OR stop_date should be > '2021-01-01'
    • Check if campaign_id from ads contains in result set above.

My current behaviour is:

select *
from facebook_api.campaigns c 
where c.id = '${campaign.id}'
and (c.stop_date is null or c.stop_date > '2021-01-01')

This will return nothing or active (with my criteria) campaign. After that I can filter them with RouteOnAttribute: ${executesql.rows.count:lt(1)}.

But there is a problem. Splitting source 300 flowfile creates about 100,000 flowfiles and I'll make a 100,000 unnecessary requests to db.

Can I perform requests with same logic without splitting flow files?

Upvotes: 0

Views: 65

Answers (1)

Sdairs
Sdairs

Reputation: 2032

Doing the SplitJson is really inefficient and probably not needed here.

You could do this with PartitionRecord to create FFs that are grouped by the campaign_id (and also have this as an attribute). This means that you do not need SplitJSON or the EvaluateJsonPath processors. Now you only end up with as many FlowFiles as there are unique campaign_ids in the original FF.

*Edit: I read this part wrong and assumed you were using QueryRecord - updated

Now your original ExecuteSQL will still work, but has far fewer FFs to execute on.

However, I'd question why you need to hit an intermediary DB in the first place. Why not have NiFi filter the raw results from hitting the Facebook API?

You could replace the ExecuteSQL with a QueryRecord that does:

select *
from FLOWFILE where (stop_date is null or stop_date > '2021-01-01')

Passing only the matching records to an 'ACTIVE' relationship. This removes the need for the DB in the middle.

The resulting flow would look something like:

InvokeHTTP (hit facebook API) -> PartitionRecord (split FFs by campaign ID) -> QueryRecord (drop all inactive campaigns)

Another thing to consider...I don't know the Facebook Graph API very well - but are there no query parameters that you could add so that the filtering is done on the FB side?

Upvotes: 1

Related Questions