Reputation: 27
I have written the apache beam python code which reads the data from pubsub and inserts into DB.When I create the template file and submit the dataflow job manually it is working fine but while doing it through composer template file is getting created but dataflow job is not getting submitted.
Cloud composer in Project A and I want to submit the dataflow job in Project B. Thats where I am using the gcp_conn_id in the dag file.
I have created the below mentioned DAG file and tried checking the things in the Airflow UI. I am able to create the template file but dataflow job is not getting submitted. Task is getting the completed with out any error. Please log
Running command: /tmp/apache-beam-venvvxk4qxz7/bin/python /home/airflow/gcs/data/SessPubSubDataFlow.py --runner=DataflowRunner --job_name=start-python-jobdf1-84a0645c --project=abc-temp --region=us-east4 --labels=airflow-version=v2-2-5-composer --template_location=gs://abc-tempstreamsdataflow/Redislookup.txt --temp_location="gs://abc-tempstreamsdataflow/Redislookup.txt"
[2022-11-08, 09:44:02 UTC] {beam.py:131} INFO - Start waiting for Apache Beam process to complete.
[2022-11-08, 09:44:12 UTC] {beam.py:127} INFO - Hello World
[2022-11-08, 09:44:18 UTC] {beam.py:127} WARNING - WARNING:root:Make sure that locally built Python SDK docker image has Python 3.8 interpreter.
[2022-11-08, 09:44:18 UTC] {beam.py:150} INFO - Process exited with return code: 0
[2022-11-08, 09:44:19 UTC] {dataflow.py:425} INFO - Start waiting for done.
[2022-11-08, 09:44:19 UTC] {dataflow.py:390} INFO - Google Cloud DataFlow job not available yet..
[2022-11-08, 09:44:19 UTC] {taskinstance.py:1279} INFO - Marking task as SUCCESS. dag_id=DataFlowPythonJob, task_id=start-python-jobdf1, execution_date=20221108T094330, start_date=20221108T094333, end_date=20221108T094419
[2022-11-08, 09:44:20 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-11-08, 09:44:20 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
import datetime
from airflow import models
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
default_args = {"start_date": days_ago(1),
'retries':0,
'project':'abc-temp'
}
with models.DAG(
dag_id="DataFlowPythonJob",
start_date=days_ago(1),
default_args=default_args,
schedule_interval="@once"
) as dag:start_python_job = BeamRunPythonPipelineOperator(task_id="start-python-jobdf1", runner="DataflowRunner", py_file="/home/airflow/gcs/data/SessPubSubDataFlow.py",py_options=[],pipeline_options={'template_location':"gs://abc-tempstreamsdataflow/Redislookup.txt", 'temp_location':"gs://abc-tempstreamsdataflow/",'project':"abc-temp"},py_requirements=['apache-beam[gcp]==2.37.0',py_interpreter='python3',py_system_site_packages=False,dataflow_config={'location': 'us-east4','project_id':'abc-temp','gcp_conn_id':'0- app',"wait_until_finished": False,'job_name':'{{task.task_id}}'},)
Is there any other operator Do I need to use to submit the data flow job.
How to submit the dataflow job from composer.
Upvotes: 0
Views: 644
Reputation: 6572
For a Dataflow
streaming job, the DAG will instantiate the Dataflow
job but you have to trigger the DAG in order to run the Dataflow
job once.
You used the correct and up-to-date operator to create and launch a Dataflow
job with Beam/Dataflow
Python
.
To execute a Dataflow
template with Airflow
, you have to use the following operator :
start_template_job = DataflowTemplatedJobStartOperator(
task_id="start-template-job",
template='gs://dataflow-templates/latest/Word_Count',
parameters={'inputFile': "gs://dataflow-samples/shakespeare/kinglear.txt", 'output': GCS_OUTPUT},
location='europe-west3',
)
Upvotes: 2