Reputation: 896
Retrying a task may be pointless. For example, if the task is a sensor and it failed because it had invalid credentials then any future retries would inevitably fail. How can I define Operators that can decide if a retry is sensible?
In Airflow 1.10.6, the logic that decides if a task should be retried or not is in airflow.models.taskinstance.TaskInstance.handle_failure
, making it impossible to define the behavior in the operator as it is a responsibility of the task and not the operator.
An ideal case would be if the handle_failure
method was defined in the Operator's side so we can re-define it as needed.
The only workaround I found was using PythonBranchingOperator
to "test" if the task can be run or not. For example, in the case of the sensor above, checking if the login credentials are valid and only then take the DAG flow to the sensor. Otherwise, fail (or branch to another task).
Is my analysis of handle_failure
correct? Is there a better workaround?
Upvotes: 6
Views: 5139
Reputation: 115
You can prevent Airflow from retrying a given task by raising the airflow.exceptions.AirflowFailException
from airflow.exceptions import AirflowFailException
raise AirflowFailException('Your error message')
Upvotes: 0
Reputation: 896
Answering my own question, by modifying the self.retries
instance variable, available in all the operators, in the execute
method we can dynamically force no more retries.
In the following example:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import BaseOperator
class PseudoSensor(BaseOperator):
def __init__(
self,
s3_status_code_mock,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.s3_status_code_mock = s3_status_code_mock
def execute(self, context):
# Try to read S3, Redshift, blah blah
pass
# The query returned a status code, that we mock when the Sensor is initialized
if self.s3_status_code_mock == 0:
# Success
return 0
elif self.s3_status_code_mock == 1:
# Error but should retry if I can still can
raise Exception("Retryable error. Won't change retries of operator.")
elif self.s3_status_code_mock == 2:
# Unrecoverable error. Should fail without future retries.
self.retries = 0
raise Exception("Unrecoverable error. Will set retries to 0.")
# A separate function so we don't make the globals dirty
def createDAG():
# Default (but overridable) arguments for Operators instantiations
default_args = {
'owner': 'Supay',
'depends_on_past': False,
'start_date': datetime(2019, 11, 28),
'retry_delay': timedelta(seconds=1),
'retries': 3,
}
with DAG("dynamic_retries_dag", default_args=default_args, schedule_interval=timedelta(days=1), catchup=False) as dag :
# Sensor 0: should succeed in first try
sensor_0 = PseudoSensor(
task_id="sensor_0",
provide_context=True,
s3_status_code_mock=0,
)
# Sensor 1: should fail after 3 tries
sensor_1 = PseudoSensor(
task_id="sensor_1",
provide_context=True,
s3_status_code_mock=1
)
# Sensor 1: should fail after 1 try
sensor_2 = PseudoSensor(
task_id="sensor_2",
provide_context=True,
s3_status_code_mock=2
)
dag >> sensor_0
dag >> sensor_1
dag >> sensor_2
globals()[dag.dag_id] = dag
# Run everything
createDAG()
Gantt showing the tries per task
Upvotes: 5
Reputation: 7815
You can get the corresponding task instance from the context and redefine the number of retries for it, for example:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
"owner": "Airflow",
"start_date": datetime(2011, 1, 1, 1, 1),
}
def fun(*, task_instance, **context):
task_instance.max_tries = 0 # reset retries to 0
raise Exception()
with DAG("my_dag", default_args=default_args, catchup=False) as dag:
op = PythonOperator(
task_id="my_op",
python_callable=fun,
provide_context=True,
retries=100000, # set a lot of retries
retry_delay=timedelta(seconds=1),
)
I personally would not dynamically redefine the number of retries, because it changes the workflow behavior in a non-obvious way inside of an operator and, therefore, complicates the reasoning about the workflow. I would just let the task fail the set number of times independently of the failure reason. If the retires are expensive I then would reduce their numbers (e.g., to 1 or 0).
Upvotes: 2