Reputation: 73
I have an Airflow Workflow that consists of three tasks; with the second task dependent on the first and the third task dependent on the second.
If I run the DAG via the webserver, the first task completes but then begins to re-run instead of triggering the second task. One thing to keep in mind is that the first task does take more than 130 seconds to run. Is this happening because of the duration of the first task?
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
default_args = {
'owner': 'David',
'depends_on_past': True,
'start_date': datetime(2018,5,18),
'email': ['email_address'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'DCM_Floodlight_Report_API',
default_args=default_args,
description='Pull ABG DCM Floodlight report. Then upload into S3 bucket.',
schedule_interval='30 14 * * *')
t1 = BashOperator(
task_id='Pull_DCM_Report',
bash_command='python "/Users/run_report.py" 2737542 134267867', dag=dag)
t2 = BashOperator(
task_id='Cleanse_File',
bash_command='python "/Users/cleanse_file.py"',dag=dag)
t3 = BashOperator(
task_id='S3_Bucket_Creation_Upload_File',
bash_command='python "/Users/aws_s3_creation&load.py"',dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t2)
Upvotes: 0
Views: 1700
Reputation: 2466
Try it without retry logic, and see how it performs. Use these default args and dag info:
`default_args = {
'owner': 'David',
'depends_on_past': False,
'start_date': datetime(2018,5,18),
'email': ['email_address'],
'email_on_failure': True,
'email_on_retry': True
}
dag = DAG(
dag_id='DCM_Floodlight_Report_API',
default_args=default_args,
catchup=False,
description='Pull ABG DCM Floodlight report. Then upload into S3 bucket.',
schedule_interval='30 14 * * *')
I added catchup
and set it to False and changed depends_on_past
to False. I removed the retry logic as well. This may fix your issue - please let me know!
Upvotes: 0
Reputation: 46
I don't think the runtime of your task is an issue.
- this behavior is most likely due to the catchup
parameter, which defaults to True
.
https://airflow.apache.org/scheduler.html#backfill-and-catchup
This means that Airflow is scheduling the first task for every schedule interval between your start_date
and the current time.
You can look at your Tree View in the UI to see if more than one DagRun is being scheduled. If you're just testing your DAG, I'd recommend setting the schedule_interval to @once
when testing before scheduling it to run for dates in the past or future.
Upvotes: 3