recyclinguy
recyclinguy

Reputation: 99

Orchestrate Dataflow job using Dataflowpythonoperator

Need some advise with running Dataflow job on airflow .I can run beam job locally and on dataflow runner for wordcount example using Python sdk. But could not orchestrate the job via airflow using DataflowPythonOperator (not sure if it is deprecated) . I had no issues using Dataflowjavaoperator orchestrating jar files.

example_composer_..._operator = DataFlowPythonOperator(

    gcp_conn_id='gcp_default',
    task_id='composer_dataflow_python_...',
    py_file='gs://dataflow.../WordCountPython.py',
    job_name='Airflow ... Job ',
    py_options=None,
    dataflow_default_options=None,
    options=None,

Any advise on how I should make approach this issue and if I am doing anything wrong. Should I use Python operator to call the wordcountpython.py file

Cheers

Upvotes: 0

Views: 1367

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 16099

The Dataflowjavaoperator is deprecated you should use DataflowCreateJavaJobOperator.

You can import the operator from providers. Usage example:

from airflow.providers.google.cloud.operators.dataflow import DataflowCreateJavaJobOperator
task = DataflowCreateJavaJobOperator(
    gcp_conn_id="gcp_default",
    task_id="normalize-cal",
    jar="{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar",
    options={
        "autoscalingAlgorithm": "BASIC",
        "maxNumWorkers": "50",
        "start": "{{ds}}",
        "partitionType": "DAY",
    },
    dag=dag,
)

If you want to use the python version then you should use DataflowCreatePythonJobOperator. Usage example:

from airflow.providers.google.cloud.operators.dataflow import DataflowCreatePythonJobOperator

start_python_job = DataflowCreatePythonJobOperator(
    task_id="start-python-job",
    py_file=GCS_PYTHON,
    py_options=[],
    job_name='{{task.task_id}}',
    options={
        'output': GCS_OUTPUT,
    },
    py_requirements=['apache-beam[gcp]==2.21.0'],
    py_interpreter='python3',
    py_system_site_packages=False,
    location='europe-west3',
)

If you are running Airflow < 2.0.0 you can get these operators by installing Google backport provider by:

pip install apache-airflow-backport-providers-google

If you are running Airflow >= 2.0.0 you can get these operators by installing Google provider:

pip install apache-airflow-providers-google

Upvotes: 2

Related Questions