Nagesh B Viswanadham
Nagesh B Viswanadham

Reputation: 27

BeamRunPythonPipelineOperator is not submitting the dataflow job

I have written the apache beam python code which reads the data from pubsub and inserts into DB.When I create the template file and submit the dataflow job manually it is working fine but while doing it through composer template file is getting created but dataflow job is not getting submitted.

Cloud composer in Project A and I want to submit the dataflow job in Project B. Thats where I am using the gcp_conn_id in the dag file.

I have created the below mentioned DAG file and tried checking the things in the Airflow UI. I am able to create the template file but dataflow job is not getting submitted. Task is getting the completed with out any error. Please log

Running command: /tmp/apache-beam-venvvxk4qxz7/bin/python /home/airflow/gcs/data/SessPubSubDataFlow.py --runner=DataflowRunner --job_name=start-python-jobdf1-84a0645c --project=abc-temp --region=us-east4 --labels=airflow-version=v2-2-5-composer --template_location=gs://abc-tempstreamsdataflow/Redislookup.txt --temp_location="gs://abc-tempstreamsdataflow/Redislookup.txt"    
[2022-11-08, 09:44:02 UTC] {beam.py:131} INFO - Start waiting for Apache Beam process to complete.
[2022-11-08, 09:44:12 UTC] {beam.py:127} INFO - Hello World
[2022-11-08, 09:44:18 UTC] {beam.py:127} WARNING - WARNING:root:Make sure that locally built Python SDK docker image has Python 3.8 interpreter.
[2022-11-08, 09:44:18 UTC] {beam.py:150} INFO - Process exited with return code: 0
[2022-11-08, 09:44:19 UTC] {dataflow.py:425} INFO - Start waiting for done.
[2022-11-08, 09:44:19 UTC] {dataflow.py:390} INFO - Google Cloud DataFlow job not available yet..
[2022-11-08, 09:44:19 UTC] {taskinstance.py:1279} INFO - Marking task as SUCCESS. dag_id=DataFlowPythonJob, task_id=start-python-jobdf1, execution_date=20221108T094330, start_date=20221108T094333, end_date=20221108T094419
[2022-11-08, 09:44:20 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-11-08, 09:44:20 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
import datetime
from airflow import models
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator

default_args = {"start_date": days_ago(1),
    'retries':0,
    'project':'abc-temp'
}

with models.DAG(
    dag_id="DataFlowPythonJob",
    start_date=days_ago(1),
    default_args=default_args,
    schedule_interval="@once"
) as dag:start_python_job = BeamRunPythonPipelineOperator(task_id="start-python-jobdf1",   runner="DataflowRunner",                                py_file="/home/airflow/gcs/data/SessPubSubDataFlow.py",py_options=[],pipeline_options={'template_location':"gs://abc-tempstreamsdataflow/Redislookup.txt",             'temp_location':"gs://abc-tempstreamsdataflow/",'project':"abc-temp"},py_requirements=['apache-beam[gcp]==2.37.0',py_interpreter='python3',py_system_site_packages=False,dataflow_config={'location': 'us-east4','project_id':'abc-temp','gcp_conn_id':'0-    app',"wait_until_finished": False,'job_name':'{{task.task_id}}'},)

Is there any other operator Do I need to use to submit the data flow job.


How to submit the dataflow job from composer.

Upvotes: 0

Views: 644

Answers (1)

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

For a Dataflow streaming job, the DAG will instantiate the Dataflow job but you have to trigger the DAG in order to run the Dataflow job once.

You used the correct and up-to-date operator to create and launch a Dataflow job with Beam/Dataflow Python.

To execute a Dataflow template with Airflow, you have to use the following operator :

start_template_job = DataflowTemplatedJobStartOperator(
    task_id="start-template-job",
    template='gs://dataflow-templates/latest/Word_Count',
    parameters={'inputFile': "gs://dataflow-samples/shakespeare/kinglear.txt", 'output': GCS_OUTPUT},
    location='europe-west3',
)

Upvotes: 2

Related Questions