Kaitharshayr
Kaitharshayr

Reputation: 23

How to get data from an API using Apache Beam (Dataflow)?

I have done some Python programming, but I'm not a seasoned developer by any stretch of the imagination. We have a Python etl programme, which was set up as a Cloud Function but it is timing out as there is just too much data to load and we are looking to re-write it to work in Dataflow.

The code at the moment simply connects to an API, which returns a newline-delimiter JSON, and then the data is loaded into a new table in BigQuery.

This is our first time using Dataflow and we are just trying to get to grips with how it works. It seems pretty easy to get the data into BigQuery the stumbling block we are hitting is how to get the data out of the API. Its not clear to us how we can make this work, do we need to go down the route of developing a new I/O connector as per [Develop IO Connector]? Or is there another option as developing a new connector seems complex?

We've done a lot of googling, but haven't found anything obvious to help.

Here is a sample of our code but we are not 100% sure its on the right track. The code doesn't work, and we think it needs to be a .io.read and not a .ParDo initially but we aren't quite sure where to go with that. Some guidance would be much appreciated!

class callAPI(beam.DoFn):
   def __init__(self, input_header, input_uri):
       self.headers = input_header
       self.remote_url = input_uri

   def process(self):
       try:
           res = requests.get(self.remote_url, headers=self.headers) 
           res.raise_for_status()
       except HTTPError as message:
           logging.error(message)
           return
       return res.text

with beam.Pipeline() as p:

    data = ( p 
                | 'Call  API ' >> beam.ParDo(callAPI(HEADER, REMOTE_URI))
                | beam.Map(print))

Thanks in advance.

Upvotes: 1

Views: 5239

Answers (2)

robertwb
robertwb

Reputation: 5104

You are on the right track, but there are a couple of things to fix.

As you point out, the root of a pipeline needs to be a read of some kind. The ParDo operation processes a set of elements (ideally in parallel), but needs some input to process. You could do

p | beam.Create(['a', 'b', 'c']) | beam.ParDo(SomeDoFn())

in which SomeDoFn will be passed a, b, and c into its process method. There is a special p | beam.Impulse() operation that will produce a single None element if there's no reasonable input and you want to ensure your DoFn is just called once. You can also read elements from a file (or similar). Note that your process method takes both self and the element to be processed, and returns an iterable (to allow zero or more outputs. There is also beam.Map and beam.FlatMap which encapsulates the simpler pattern). So you could do something like

class CallAPI(beam.DoFn):
   def __init__(self, input_header):
       self.headers = input_header

   def process(self, input_uri):
       try:
           res = requests.get(input_uri, headers=self.headers) 
           res.raise_for_status()
       except HTTPError as message:
           logging.error(message)
       yield res.text

with beam.Pipeline() as p:

    data = (
        p
        | beam.Create([REMOTE_URI]) 
        | 'Call API ' >> beam.ParDo(CallAPI(HEADER))
        | beam.Map(print))

which would allow you to read from more than one URI (in parallel) in the same pipeline.

You could write a full IO connector if your source is such that it can be split (ideally dynamically) rather than only read in one huge request.

Upvotes: 5

Conall Daly
Conall Daly

Reputation: 63

Can you share the code from your cloud function? Is this a scheduled task or triggered by an event? If it is a scheduled task Apache Airflow may be a better option, you could use Dataflow Python Operators and BigQueryOperators to do what you're looking for

Upvotes: 1

Related Questions