Reputation: 642
I am pretty new to Dataflow. I have a batch job that scrape data from website and load data to Bigquery
import apache_beam as beam
def get_data_from_id(id):
# scrape data using input id
return id
def process_data(data):
# process data to dataframe
return dataframe
def load_table(dataframe):
# load dataframe to bq table
# not return anything
# create beam pipeline
p.beam.Pipeline()
id_no = p | "Input id" >> beam.create(['G123', 'G244', 'G444'])
data = id_no | "Scrape data" >> beam.Map(get_data_from_id)
data = data | "prcess data" >> beam.FlatMap(process_data)
data | "load to bq" >> beam.Map(load_table)
result = p.run()
This worked normally and able to load data to bq table using a dataflow runner. But my question is that, I want to add another process (as a normal python function) after "load to bq" done without any need on output result from "load to bq"
So, I tried with
def addtional_function():
# run something
# no input argument
...
...
result = p.run()
result.wait_until_finish()
additional_function()
This worked fine using a DirectRunner but I need to run it with Dataflow runner and I found out its only apply to a function inside a beam.pipeline. The function is not showing in the Dataflow console and its not running as well.
The additonal function doesn't need any input but if I add a | "do additional fn" >> beam.Map(additional_function)
to the next line of beam flow it will error that there is 1 input but additional_function() got 0 argument
So, how do I include this function to the beam.pipeline so that the process is able to run with Dataflow runner.
Edit:
If I do something like this
data = data | "load to bq" >> beam.Map(load_table)
data | "addtional fn" >> beam.Map(additional_function)
It will throw an error like this
TypeError: additional_function() takes 0 positional arguments but 1 was given [while running 'addtional fn']
If I don't parse the output like this,
data = data | "load to bq" >> beam.Map(load_table)
"addtional fn" >> beam.Map(additional_function)
The additional_function() will not be run as well
Upvotes: 2
Views: 1263
Reputation: 1629
Your additional function must be included into the Beam pipeline to be exported to Dataflow.
If you want it to be executed at the end, after "load to bq", try this:
def additional_function(param):
# all the logic you want
return None # as it's a map function, you need an input parameter and an output value
data = data | "load to bq" >> beam.Map(load_table)
data | "additional_function" >> beam.Map(additional_function)
Upvotes: 2