Reputation: 317
I have a GCP Data Fusion pipeline where I am performing a GET request on an API which returns me a JSON list of user information including the user id. I am able to do this successfully with the Data Fusion HTTP plugin (available in the Data Fusion HUB). Here is an example of such a list:
[
{
"id" : "adsa6d672",
"firstName" : "John",
"lastName" : "Doe"
},
{
"id" : "adsa6d672",
"firstName" : "John",
"lastName" : "Doe"
}
]
Based on this list (which I have successfully parsed), instead of just sinking it to a database, I want to make a HTTP call on a per user id basis on another API. Is there a way of doing this on a single Data Fusion pipeline (without having to bring another pipeline up just to perform one HTTP request?
I tried using the Data Fusion python transformation plugin (available in the HUB) and use the python requests
library to perform the http requests, but this ended up in an error as the requests library is not installed in the python interpreter that runs the transformation plugin.
Upvotes: 0
Views: 486
Reputation: 11
This is a bit tricky to do as the HTTP plugin doesn't seem to have a default functionality for iterating IDs from another response.
After a bit of experimenting, we found a way to do this in one HTTP plugin for our use case as a "workaround":
import json
import os
# this is a predefined function that is called after every successful HTTP response
# url: string of the last called URL
# page: string of the response content (our JSON)
# headers: response headers
# This function needs to return a string of the next URL to call or None to stop
def get_next_page_url(url, page, headers):
# parse the content as JSON object
page_json = json.loads(page)
# we need some way to differentiate between the list API endpoint and details API endpoint
# this is the branch for the list API endpoint to collect all item IDs
if url == "<your-list-api-endpoint>":
# iterating over all item IDs in the list
item_ids = []
for item in page_json["<your-item-array>"]:
item_ids.append(item["id"])
# we need to write the collected item IDs to the local file system to have them present in the next function call
with open("/tmp/id_list", "w") as file:
for item_id in item_ids:
file.write("%s\n" % str(item_id))
# return the URL of the first item details to continue
return "<your-details-api-endpoint>/" + str(item_ids[0])
# this branch is used with the details URL to iterate over all IDs
else:
# get all your item IDs by reading them from file system again
with open("/tmp/id_list", "r") as file:
item_ids = [item_id.strip() for item_id in file.readlines()]
# get the last used id from the url
current_id = url.split("/")[-1]
# find the index of the current item ID in the list
pos_index = item_ids.index(current_id)
# if last item in ID list is reached, stop
if len(item_ids) == pos_index + 1:
return None
# continue with next item id
else:
return "<your-details-api-endpoint>/" + str(item_ids[pos_index+1])
Upvotes: 0
Reputation: 399
Have you tried using the HTTP sink? From the Hub, you can find the HTTP plugins, which will contain the HTTP sink.
Upvotes: 0