Reputation: 93
I am trying to orchestrate a GCP workflow to first run a query in Big Query to get some metadata (name & id) that would then be passed to another step in the workflow that starts a dataflow job given those parameters as input.
So step by step I want something like:
Is this possible or is there a better solution?
Upvotes: 0
Views: 1032
Reputation: 6572
I propose you 2 solutions and I hope it can help.
- Solution 1 :
If you have an orchestrator like Airflow
in Cloud Composer
:
BigQueryInsertJobOperator
in Airflow
, this operator allows to execute a query to Bigquery
xcom
BeamRunPythonPipelineOperator
BeamRunPythonPipelineOperator
, you override the execute
method. In this method, you can recover the data from previous operator via xcom pull
as Dict
Dict
as pipeline options to your operator that extends BeamRunPythonPipelineOperator
BeamRunPythonPipelineOperator
will launch your Dataflow
jobAn example of operator with execute
method :
class CustomBeamOperator(BeamRunPythonPipelineOperator):
def __init__(
self,
your_field
...
**kwargs) -> None:
super().__init__(**kwargs)
self.your_field = your_field
...
def execute(self, context):
task_instance = context['task_instance']
your_conf_from_bq = task_instance.xcom_pull('task_id_previous_operator')
operator = BeamRunPythonPipelineOperator(
runner='DataflowRunner',
py_file='your_dataflow_main_file.py',
task_id='launch_dataflow_job',
pipeline_options=your_conf_from_bq,
py_system_site_packages=False,
py_interpreter='python3',
dataflow_config=DataflowConfiguration(
location='your_region'
)
)
operator.execute(context)
- Solution 2 :
If you don't have an orchestrator like Airflow
Dataflow
job but add Python Bigquery
client as package : https://cloud.google.com/bigquery/docs/reference/librariesPython
file that retrieves your conf from Bigquery
table as Dict
via Bigquery
clientpython -m folder.your_main_file \
--runner=DataflowRunner \
--conf1=conf1/ \
--conf2=conf2
....
--setup_file=./your_setup.py \
Python
command with Python
suprocess
Dataflow
job : https://pypi.org/project/google-cloud-dataflow-client/
I didn't tried it.I think the solution with Airflow
is easier.
Upvotes: 1