Venkata Gogu
Venkata Gogu

Reputation: 1051

Stop performing remaining tasks in airflow

I have three tasks t1,t2,t3. each task output is next task input for eg., t1 output is t2 input. After completion of t1, I am getting an empty output folder(which can happen in my case and it is acceptable and marked t1 as success) but t2 is failed to fetch the output of t1 as there are no files. I want to mark t2 and t3 as success if there are no files. How can I skip the next two tasks.


I went through the airflow docs and other articles came across sensors and poke method. But, not sure how to proceed with that.

Upvotes: 1

Views: 6751

Answers (2)

andscoop
andscoop

Reputation: 959

You can leverage a SensorOperator more specifically the FileSensorOperator to check if a file exists. You can then use the soft_fail arg to mark the tasks as "skipped" when the file does not exist. This will allow the DAG to succeed while maintaining the proper history of what occurred on the file check.

Upvotes: 8

skozz
skozz

Reputation: 2720

The @andscoop's answer is good, but only in order to bring more ideas:

Possible solution 1

I'm doing something similar (dependencies A > B > C) and I've solved the approach using the XCOM pushed by default by the previous task.

Any value that the execute method returns is saved as an Xcom message under the key return_value. We’ll cover this topic later. Source http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/

# copy&paste it into dags/stackoverflow.py to test it

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from datetime import datetime


dag = DAG('stackoverflow', description='Another Dag',
          schedule_interval='* * * 1 1',
          start_date=datetime(2018, 6, 27), catchup=False)


def do_a(**kwargs):
    # Assuming that your TASK A is not returning a value
    return None


task_a = PythonOperator(task_id='do_a',
                        python_callable=do_a,
                        provide_context=True,
                        dag=dag)


def do_b(**kwargs):
    result_from_a = kwargs['ti'].xcom_pull(task_ids='do_a')
    if result_from_a:
        print("Continue with your second task")
    else:
        print("Send a notification somewhere, log something or stop the job here.")


task_b = PythonOperator(task_id='do_b',
                        python_callable=do_b,
                        provide_context=True,
                        dag=dag)
task_a >> task_b

enter image description here enter image description here

Possible solution 2

Branching. In a more sophisticated way (and using the best practices) you can do a branch to determinate the next step/task based on the result of t1. I can't do a proper example now but here 2 sources to understand how it works with examples:

Upvotes: 1

Related Questions