Reputation: 99
Need some advise with running Dataflow job on airflow .I can run beam job locally and on dataflow runner for wordcount example using Python sdk. But could not orchestrate the job via airflow using DataflowPythonOperator (not sure if it is deprecated) . I had no issues using Dataflowjavaoperator orchestrating jar files.
example_composer_..._operator = DataFlowPythonOperator(
gcp_conn_id='gcp_default',
task_id='composer_dataflow_python_...',
py_file='gs://dataflow.../WordCountPython.py',
job_name='Airflow ... Job ',
py_options=None,
dataflow_default_options=None,
options=None,
Any advise on how I should make approach this issue and if I am doing anything wrong. Should I use Python operator to call the wordcountpython.py file
Cheers
Upvotes: 0
Views: 1367
Reputation: 16099
The Dataflowjavaoperator
is deprecated you should use DataflowCreateJavaJobOperator
.
You can import the operator from providers. Usage example:
from airflow.providers.google.cloud.operators.dataflow import DataflowCreateJavaJobOperator
task = DataflowCreateJavaJobOperator(
gcp_conn_id="gcp_default",
task_id="normalize-cal",
jar="{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar",
options={
"autoscalingAlgorithm": "BASIC",
"maxNumWorkers": "50",
"start": "{{ds}}",
"partitionType": "DAY",
},
dag=dag,
)
If you want to use the python version then you should use DataflowCreatePythonJobOperator
. Usage example:
from airflow.providers.google.cloud.operators.dataflow import DataflowCreatePythonJobOperator
start_python_job = DataflowCreatePythonJobOperator(
task_id="start-python-job",
py_file=GCS_PYTHON,
py_options=[],
job_name='{{task.task_id}}',
options={
'output': GCS_OUTPUT,
},
py_requirements=['apache-beam[gcp]==2.21.0'],
py_interpreter='python3',
py_system_site_packages=False,
location='europe-west3',
)
If you are running Airflow < 2.0.0 you can get these operators by installing Google backport provider by:
pip install apache-airflow-backport-providers-google
If you are running Airflow >= 2.0.0 you can get these operators by installing Google provider:
pip install apache-airflow-providers-google
Upvotes: 2