JensU
JensU

Reputation: 21

Google Data Fusion: "Looping" over input data to then execute multiple Restful API calls per input row

I have the following challenge I would like to solve preferably in Google Data Fusion:
I have one web service that returns about 30-50 elements describing an invoice in a JSON payload like this:

{
  "invoice-services": [
    {
      "serviceId": "[some-20-digit-string]",
      // some other stuff omitted
    },
    [...]
  ]
}

For each occurrence of serviceId I then need to call another webservice https://example.com/api/v2/services/{serviceId}/items repeatedly where each serviceId comes from the first call. I am only interested in the data from the second call which is to be persisted into BigQuery. This second service call doesn't support wildcards or any other mechanism to aggregate the items - i.e. if I have 30 serviceId from the first call, I need to call the second webservice 30 times.

I have made the first call work, I have made the second call work with a hard coded serviceId and also the persistence into BigQuery. These calls simply use the Data Fusion HTTP adapter.

However, how can I use the output of the first service in such a way that I issue one webservice call for the second service for each row returned from the first call - effectively looping over all serviceId?

I completely appreciate this is very easy in Python Code, but for maintainability and fit with our environment I would prefer to solve this in Data Fusion or need be any of the other -as-a-Service offerings from Google.

Any help is really appreciated! J

PS: This is NOT a big data problem -I am looking at about 50 serviceId and maybe 300 items.

Upvotes: 2

Views: 1144

Answers (1)

Rene2000k
Rene2000k

Reputation: 11

This is a bit tricky to do as the HTTP plugin doesn't seem to have a default functionality for iterating IDs from another response.

After a bit of experimenting, we found a way to do this in one HTTP plugin for our use case as a "workaround":

  1. Write your list URL / API endpoint into the HTTP plugin's URL field. This will be used for the first request.
  2. In the Pagination section choose the "Custom" option. There you have to provide a Python script containing the pagination logic. The scripts function is called once after every response the HTTP plugin receives. Please be aware that this is Python 2 code.
    The script is used to parse the item list / IDs and to iterate over all IDs to get the details from the different API endpoint.
import json
import os

# this is a predefined function that is called after every successful HTTP response
# url: string of the last called URL
# page: string of the response content (our JSON)
# headers: response headers
# This function needs to return a string of the next URL to call or None to stop

def get_next_page_url(url, page, headers):
    # parse the content as JSON object
    page_json = json.loads(page)

    # we need some way to differentiate between the list API endpoint and details API endpoint
    # this is the branch for the list API endpoint to collect all item IDs
    if url == "<your-list-api-endpoint>":
        # iterating over all item IDs in the list
        item_ids = []
        for item in page_json["invoice-services"]:
          item_ids.append(item["serviceId"])      
   
        # we need to write the collected item IDs to the local file system to have them present in the next function call
        with open("/tmp/id_list", "w") as file:
            for item_id in item_ids:
                file.write("%s\n" % str(item_id))

        # return the URL of the first item details to continue
        return "https://example.com/api/v2/services/" + str(item_ids[0]) + "/items"

    # this branch is used with the details URL to iterate over all IDs
    else:
       # get all your item IDs by reading them from file system again
        with open("/tmp/id_list", "r") as file:
            item_ids = [item_id.strip() for item_id in file.readlines()]
            # get the last used id from the url
            current_id = url.split("/")[-2]

            # find the index of the current item ID in the list
            pos_index = item_ids.index(current_id)
            # if last item in ID list is reached, stop
            if len(item_ids) == pos_index + 1:
                return None

            # continue with next item id
            else:
                return "https://example.com/api/v2/services/" + str(item_ids[pos_index+1]) + "/items"
  1. This procedure leads the HTTP plugin to output two different kinds of contents as records: The list JSON and the details JSON. To cope with this, you can put the HTTP plugin's output format to "text".
    Afterwards you can use the JSON parser plugin to parse your item details. When this plugin receives the list JSON content, it will put every entry to "null" (as it cannot find the fields defined for the details).
    Then you could use the Wrangler plugin to filter out all emtpy (null) records to get rid of the list content.

Upvotes: 1

Related Questions