Reputation: 149
I'm trying to simplify flow in Apache NiFi.
What I want:
campaigns
for ad accounts and save it to DB.
Response example:[ {
"start_date" : "2018-10-15",
"stop_date" : "2019-03-31",
"id" : "608962192",
"account_id" : "1007311",
"name" : "Axe_Instagram_aug-dec2018_col",
"status" : "ACTIVE",
"start_time" : "2018-10-15",
"stop_time" : "2019-03-31"
}, {
"start_date" : "2018-10-08",
"stop_date" : "2018-10-31",
"id" : "61084542",
"account_id" : "10240051",
"name" : "Axe_IG_aug-dec2018",
"status" : "ACTIVE",
"start_time" : "2018-10-08",
"stop_time" : "2018-10-31"
} ]
ads
for ad accounts and save it to DB.
Response example:[
{
"id":"23845",
"account_id":"251977841",
"name":"Post_2",
"status":"ACTIVE",
"campaign_id":"2384345125",
"adset_id":"238125",
"bid_amount":87,
"updated_time":"2019-06-20T14:21:06+0300"
},
{
"id":"23843453786320125",
"account_id":"2251971478158841",
"name":"Post_1",
"status":"ACTIVE",
"campaign_id":"238225",
"adset_id":"2384325",
"bid_amount":87,
"updated_time":"2019-06-20T14:21:06+0300"
}
]
ads
:
campaigns
) using these rules: stop_date
should be empty (NULL) OR stop_date
should be > '2021-01-01'campaign_id
from ads
contains in result set above.My current behaviour is:
Completed 2 steps above; all data stored in DB.
For each flow file from ads
API I'm using next flow:
SplitJson
to separate ad
one by one;
EvaluateJsonPath
to store campaign_id
to attributes;
ExecuteSQL
with next statement for each flow file:
select *
from facebook_api.campaigns c
where c.id = '${campaign.id}'
and (c.stop_date is null or c.stop_date > '2021-01-01')
This will return nothing or active
(with my criteria) campaign. After that I can filter them with RouteOnAttribute
: ${executesql.rows.count:lt(1)}
.
But there is a problem. Splitting source 300 flowfile creates about 100,000 flowfiles and I'll make a 100,000 unnecessary requests to db.
Can I perform requests with same logic without splitting flow files?
Upvotes: 0
Views: 65
Reputation: 2032
Doing the SplitJson is really inefficient and probably not needed here.
You could do this with PartitionRecord to create FFs that are grouped by the campaign_id
(and also have this as an attribute). This means that you do not need SplitJSON
or the EvaluateJsonPath
processors. Now you only end up with as many FlowFiles as there are unique campaign_id
s in the original FF.
*Edit: I read this part wrong and assumed you were using QueryRecord - updated
Now your original ExecuteSQL
will still work, but has far fewer FFs to execute on.
However, I'd question why you need to hit an intermediary DB in the first place. Why not have NiFi filter the raw results from hitting the Facebook API?
You could replace the ExecuteSQL
with a QueryRecord
that does:
select *
from FLOWFILE where (stop_date is null or stop_date > '2021-01-01')
Passing only the matching records to an 'ACTIVE' relationship. This removes the need for the DB in the middle.
The resulting flow would look something like:
InvokeHTTP
(hit facebook API) -> PartitionRecord
(split FFs by campaign ID) -> QueryRecord
(drop all inactive campaigns)
Another thing to consider...I don't know the Facebook Graph API very well - but are there no query parameters that you could add so that the filtering is done on the FB side?
Upvotes: 1