Reputation: 1051
I have three tasks t1,t2,t3
. each task output is next task input for eg., t1
output is t2
input. After completion of t1
, I am getting an empty output folder(which can happen in my case and it is acceptable and marked t1
as success) but t2
is failed to fetch the output of t1
as there are no files. I want to mark t2
and t3
as success if there are no files. How can I skip the next two tasks.
I went through the airflow docs and other articles came across sensors and poke method. But, not sure how to proceed with that.
Upvotes: 1
Views: 6751
Reputation: 959
You can leverage a SensorOperator more specifically the FileSensorOperator to check if a file exists. You can then use the soft_fail arg to mark the tasks as "skipped" when the file does not exist. This will allow the DAG to succeed while maintaining the proper history of what occurred on the file check.
Upvotes: 8
Reputation: 2720
The @andscoop's answer is good, but only in order to bring more ideas:
I'm doing something similar (dependencies A > B > C) and I've solved the approach using the XCOM pushed by default by the previous task.
Any value that the execute method returns is saved as an Xcom message under the key
return_value
. We’ll cover this topic later. Source http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/
# copy&paste it into dags/stackoverflow.py to test it
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG('stackoverflow', description='Another Dag',
schedule_interval='* * * 1 1',
start_date=datetime(2018, 6, 27), catchup=False)
def do_a(**kwargs):
# Assuming that your TASK A is not returning a value
return None
task_a = PythonOperator(task_id='do_a',
python_callable=do_a,
provide_context=True,
dag=dag)
def do_b(**kwargs):
result_from_a = kwargs['ti'].xcom_pull(task_ids='do_a')
if result_from_a:
print("Continue with your second task")
else:
print("Send a notification somewhere, log something or stop the job here.")
task_b = PythonOperator(task_id='do_b',
python_callable=do_b,
provide_context=True,
dag=dag)
task_a >> task_b
Branching. In a more sophisticated way (and using the best practices) you can do a branch to determinate the next step/task based on the result of t1
. I can't do a proper example now but here 2 sources to understand how it works with examples:
Upvotes: 1