Jh123
Jh123

Reputation: 93

Pass output from one workflow step to another in GCP

I am trying to orchestrate a GCP workflow to first run a query in Big Query to get some metadata (name & id) that would then be passed to another step in the workflow that starts a dataflow job given those parameters as input.

So step by step I want something like:

  1. Result = Query("SELECT ID & name from biq query table")
  2. Start dataflow job: Input(result)

Is this possible or is there a better solution?

Upvotes: 0

Views: 1032

Answers (1)

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

I propose you 2 solutions and I hope it can help.

- Solution 1 : If you have an orchestrator like Airflow in Cloud Composer :

  • Use task with a BigQueryInsertJobOperator in Airflow, this operator allows to execute a query to Bigquery
  • Pass the result to a second Operator via xcom
  • 2 second operator is an operator that extends BeamRunPythonPipelineOperator
  • When you extend BeamRunPythonPipelineOperator, you override the execute method. In this method, you can recover the data from previous operator via xcom pull as Dict
  • Pass this Dict as pipeline options to your operator that extends BeamRunPythonPipelineOperator
  • The BeamRunPythonPipelineOperator will launch your Dataflow job

An example of operator with execute method :

class CustomBeamOperator(BeamRunPythonPipelineOperator):

    def __init__(
            self,
            your_field
            ...
            **kwargs) -> None:
        super().__init__(**kwargs)
        self.your_field = your_field
        ...

    def execute(self, context):
        task_instance = context['task_instance']
        your_conf_from_bq =  task_instance.xcom_pull('task_id_previous_operator')
       
        operator = BeamRunPythonPipelineOperator(
            runner='DataflowRunner',
            py_file='your_dataflow_main_file.py',
            task_id='launch_dataflow_job',
            pipeline_options=your_conf_from_bq,
            py_system_site_packages=False,
            py_interpreter='python3',
            dataflow_config=DataflowConfiguration(
               location='your_region'
            )
        )
        
        operator.execute(context)

- Solution 2 :

If you don't have an orchestrator like Airflow

  • You can use the same virtual env that launch your actual Dataflow job but add Python Bigquery client as package : https://cloud.google.com/bigquery/docs/reference/libraries
  • Create a main Python file that retrieves your conf from Bigquery table as Dict via Bigquery client
  • Generate with Python the command line to launch your Dataflow job with the previous conf retrieved from database, example with Python :
python -m folder.your_main_file \
        --runner=DataflowRunner \
        --conf1=conf1/ \
        --conf2=conf2
        ....
        --setup_file=./your_setup.py \

I think the solution with Airflow is easier.

Upvotes: 1

Related Questions