emp
emp

Reputation: 642

Run a function with no input inside beam.pipeline using Dataflow runner

I am pretty new to Dataflow. I have a batch job that scrape data from website and load data to Bigquery

import apache_beam as beam
   

def get_data_from_id(id):
   # scrape data using input id
   return id

def process_data(data):
   # process data to dataframe
   return dataframe

def load_table(dataframe):
   # load dataframe to bq table
   # not return anything

# create beam pipeline 
p.beam.Pipeline()
id_no = p | "Input id" >> beam.create(['G123', 'G244', 'G444'])
data = id_no | "Scrape data" >> beam.Map(get_data_from_id)
data = data | "prcess data" >> beam.FlatMap(process_data)
data | "load to bq" >> beam.Map(load_table)

result = p.run()

This worked normally and able to load data to bq table using a dataflow runner. But my question is that, I want to add another process (as a normal python function) after "load to bq" done without any need on output result from "load to bq"

So, I tried with

def addtional_function():
   # run something 
   # no input argument 
...
...
result = p.run()
result.wait_until_finish()
additional_function()

This worked fine using a DirectRunner but I need to run it with Dataflow runner and I found out its only apply to a function inside a beam.pipeline. The function is not showing in the Dataflow console and its not running as well.

The additonal function doesn't need any input but if I add a | "do additional fn" >> beam.Map(additional_function) to the next line of beam flow it will error that there is 1 input but additional_function() got 0 argument

So, how do I include this function to the beam.pipeline so that the process is able to run with Dataflow runner.

Edit:

If I do something like this

data = data | "load to bq" >> beam.Map(load_table)
data | "addtional fn" >> beam.Map(additional_function)

It will throw an error like this

TypeError: additional_function() takes 0 positional arguments but 1 was given [while running 'addtional fn']

If I don't parse the output like this,

data = data | "load to bq" >> beam.Map(load_table)
"addtional fn" >> beam.Map(additional_function)

The additional_function() will not be run as well

Upvotes: 2

Views: 1263

Answers (1)

Sergio Lema
Sergio Lema

Reputation: 1629

Your additional function must be included into the Beam pipeline to be exported to Dataflow.

If you want it to be executed at the end, after "load to bq", try this:

def additional_function(param):
   # all the logic you want
   return None # as it's a map function, you need an input parameter and an output value


data = data | "load to bq" >> beam.Map(load_table)
data | "additional_function" >> beam.Map(additional_function)

Upvotes: 2

Related Questions