D.Lee
D.Lee

Reputation: 73

Airflow Task does not move onto dependency but re-runs task

I have an Airflow Workflow that consists of three tasks; with the second task dependent on the first and the third task dependent on the second.

If I run the DAG via the webserver, the first task completes but then begins to re-run instead of triggering the second task. One thing to keep in mind is that the first task does take more than 130 seconds to run. Is this happening because of the duration of the first task?

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime


default_args = {
    'owner': 'David',
    'depends_on_past': True,
    'start_date': datetime(2018,5,18),
    'email': ['email_address'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'DCM_Floodlight_Report_API',
    default_args=default_args,
    description='Pull ABG DCM Floodlight report. Then upload into S3 bucket.',
    schedule_interval='30 14 * * *')

t1 = BashOperator(
    task_id='Pull_DCM_Report',
    bash_command='python "/Users/run_report.py" 2737542 134267867', dag=dag)

t2 = BashOperator(
    task_id='Cleanse_File',
    bash_command='python "/Users/cleanse_file.py"',dag=dag)

t3 = BashOperator(
    task_id='S3_Bucket_Creation_Upload_File',
    bash_command='python "/Users/aws_s3_creation&load.py"',dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t2)

Upvotes: 0

Views: 1700

Answers (2)

Zack
Zack

Reputation: 2466

Try it without retry logic, and see how it performs. Use these default args and dag info:

`default_args = {
'owner': 'David',
'depends_on_past': False,
'start_date': datetime(2018,5,18),
'email': ['email_address'],
'email_on_failure': True,
'email_on_retry': True
}

dag = DAG(
 dag_id='DCM_Floodlight_Report_API',
 default_args=default_args,
 catchup=False,
 description='Pull ABG DCM Floodlight report. Then upload into S3 bucket.',
 schedule_interval='30 14 * * *')

I added catchup and set it to False and changed depends_on_past to False. I removed the retry logic as well. This may fix your issue - please let me know!

Upvotes: 0

Pete DeJoy
Pete DeJoy

Reputation: 46

I don't think the runtime of your task is an issue. - this behavior is most likely due to the catchup parameter, which defaults to True.

https://airflow.apache.org/scheduler.html#backfill-and-catchup

This means that Airflow is scheduling the first task for every schedule interval between your start_date and the current time. You can look at your Tree View in the UI to see if more than one DagRun is being scheduled. If you're just testing your DAG, I'd recommend setting the schedule_interval to @once when testing before scheduling it to run for dates in the past or future.

Upvotes: 3

Related Questions