Reputation: 23
I have done some Python programming, but I'm not a seasoned developer by any stretch of the imagination. We have a Python etl programme, which was set up as a Cloud Function but it is timing out as there is just too much data to load and we are looking to re-write it to work in Dataflow.
The code at the moment simply connects to an API, which returns a newline-delimiter JSON, and then the data is loaded into a new table in BigQuery.
This is our first time using Dataflow and we are just trying to get to grips with how it works. It seems pretty easy to get the data into BigQuery the stumbling block we are hitting is how to get the data out of the API. Its not clear to us how we can make this work, do we need to go down the route of developing a new I/O connector as per [Develop IO Connector]? Or is there another option as developing a new connector seems complex?
We've done a lot of googling, but haven't found anything obvious to help.
Here is a sample of our code but we are not 100% sure its on the right track. The code doesn't work, and we think it needs to be a .io.read
and not a .ParDo
initially but we aren't quite sure where to go with that. Some guidance would be much appreciated!
class callAPI(beam.DoFn):
def __init__(self, input_header, input_uri):
self.headers = input_header
self.remote_url = input_uri
def process(self):
try:
res = requests.get(self.remote_url, headers=self.headers)
res.raise_for_status()
except HTTPError as message:
logging.error(message)
return
return res.text
with beam.Pipeline() as p:
data = ( p
| 'Call API ' >> beam.ParDo(callAPI(HEADER, REMOTE_URI))
| beam.Map(print))
Thanks in advance.
Upvotes: 1
Views: 5239
Reputation: 5104
You are on the right track, but there are a couple of things to fix.
As you point out, the root of a pipeline needs to be a read of some kind. The ParDo operation processes a set of elements (ideally in parallel), but needs some input to process. You could do
p | beam.Create(['a', 'b', 'c']) | beam.ParDo(SomeDoFn())
in which SomeDoFn
will be passed a
, b
, and c
into its process
method. There is a special p | beam.Impulse()
operation that will produce a single None
element if there's no reasonable input and you want to ensure your DoFn is just called once. You can also read elements from a file (or similar). Note that your process
method takes both self
and the element to be processed, and returns an iterable (to allow zero or more outputs. There is also beam.Map
and beam.FlatMap
which encapsulates the simpler pattern). So you could do something like
class CallAPI(beam.DoFn):
def __init__(self, input_header):
self.headers = input_header
def process(self, input_uri):
try:
res = requests.get(input_uri, headers=self.headers)
res.raise_for_status()
except HTTPError as message:
logging.error(message)
yield res.text
with beam.Pipeline() as p:
data = (
p
| beam.Create([REMOTE_URI])
| 'Call API ' >> beam.ParDo(CallAPI(HEADER))
| beam.Map(print))
which would allow you to read from more than one URI (in parallel) in the same pipeline.
You could write a full IO connector if your source is such that it can be split (ideally dynamically) rather than only read in one huge request.
Upvotes: 5
Reputation: 63
Can you share the code from your cloud function? Is this a scheduled task or triggered by an event? If it is a scheduled task Apache Airflow may be a better option, you could use Dataflow Python Operators and BigQueryOperators to do what you're looking for
Upvotes: 1