FVCC
FVCC

Reputation: 317

How to perform multiple HTTP calls from within a GCP DataFusion / CDAP pipeline

I have a GCP Data Fusion pipeline where I am performing a GET request on an API which returns me a JSON list of user information including the user id. I am able to do this successfully with the Data Fusion HTTP plugin (available in the Data Fusion HUB). Here is an example of such a list:

[
{
  "id" : "adsa6d672",
  "firstName" : "John",
  "lastName" : "Doe"
},
{
  "id" : "adsa6d672",
  "firstName" : "John",
  "lastName" : "Doe"
}
]

Based on this list (which I have successfully parsed), instead of just sinking it to a database, I want to make a HTTP call on a per user id basis on another API. Is there a way of doing this on a single Data Fusion pipeline (without having to bring another pipeline up just to perform one HTTP request?

I tried using the Data Fusion python transformation plugin (available in the HUB) and use the python requests library to perform the http requests, but this ended up in an error as the requests library is not installed in the python interpreter that runs the transformation plugin.

Upvotes: 0

Views: 486

Answers (2)

Rene2000k
Rene2000k

Reputation: 11

This is a bit tricky to do as the HTTP plugin doesn't seem to have a default functionality for iterating IDs from another response.

After a bit of experimenting, we found a way to do this in one HTTP plugin for our use case as a "workaround":

  1. Write your list URL / API endpoint into the HTTP plugin's URL field. This will be used for the first request.
  2. In the Pagination section choose the "Custom" option. There you have to provide a Python script containing the pagination logic. The scripts function is called once after every response the HTTP plugin receives. Please be aware that this is Python 2 code.
    In our case, the script is used to parse the item list / IDs and to iterate over all IDs to get the details from a different API endpoint.
import json
import os

# this is a predefined function that is called after every successful HTTP response
# url: string of the last called URL
# page: string of the response content (our JSON)
# headers: response headers
# This function needs to return a string of the next URL to call or None to stop

def get_next_page_url(url, page, headers):
    # parse the content as JSON object
    page_json = json.loads(page)

    # we need some way to differentiate between the list API endpoint and details API endpoint
    # this is the branch for the list API endpoint to collect all item IDs
    if url == "<your-list-api-endpoint>":
        # iterating over all item IDs in the list
        item_ids = []
        for item in page_json["<your-item-array>"]:
          item_ids.append(item["id"])      
   
        # we need to write the collected item IDs to the local file system to have them present in the next function call
        with open("/tmp/id_list", "w") as file:
            for item_id in item_ids:
                file.write("%s\n" % str(item_id))

        # return the URL of the first item details to continue
        return "<your-details-api-endpoint>/" + str(item_ids[0])

    # this branch is used with the details URL to iterate over all IDs
    else:
       # get all your item IDs by reading them from file system again
        with open("/tmp/id_list", "r") as file:
            item_ids = [item_id.strip() for item_id in file.readlines()]
            # get the last used id from the url
            current_id = url.split("/")[-1]

            # find the index of the current item ID in the list
            pos_index = item_ids.index(current_id)
            # if last item in ID list is reached, stop
            if len(item_ids) == pos_index + 1:
                return None

            # continue with next item id
            else:
                return "<your-details-api-endpoint>/" + str(item_ids[pos_index+1])
  1. This procedure leads the HTTP plugin to output two different kinds of contents as records: The list JSON and the details JSON. To cope with this, you can put the HTTPs output format to "text".
    Afterwards you can use the JSON parser plugin to parse your item details. When this plugin receives the list JSON content, it will put every entry to "null" (as it cannot find the fields defined for the details).
    Then you could use the Wrangler plugin to filter out all emtpy (null) records to get rid of the list content.

Upvotes: 0

Edwin Elia
Edwin Elia

Reputation: 399

Have you tried using the HTTP sink? From the Hub, you can find the HTTP plugins, which will contain the HTTP sink.

Upvotes: 0

Related Questions