Alechan
Alechan

Reputation: 896

Dynamically change the number of task retries

Retrying a task may be pointless. For example, if the task is a sensor and it failed because it had invalid credentials then any future retries would inevitably fail. How can I define Operators that can decide if a retry is sensible?

In Airflow 1.10.6, the logic that decides if a task should be retried or not is in airflow.models.taskinstance.TaskInstance.handle_failure, making it impossible to define the behavior in the operator as it is a responsibility of the task and not the operator.

An ideal case would be if the handle_failure method was defined in the Operator's side so we can re-define it as needed.

The only workaround I found was using PythonBranchingOperator to "test" if the task can be run or not. For example, in the case of the sensor above, checking if the login credentials are valid and only then take the DAG flow to the sensor. Otherwise, fail (or branch to another task).

Is my analysis of handle_failure correct? Is there a better workaround?

Upvotes: 6

Views: 5139

Answers (3)

Daniel Ortega
Daniel Ortega

Reputation: 115

You can prevent Airflow from retrying a given task by raising the airflow.exceptions.AirflowFailException

from airflow.exceptions import AirflowFailException
raise AirflowFailException('Your error message')

Upvotes: 0

Alechan
Alechan

Reputation: 896

Answering my own question, by modifying the self.retries instance variable, available in all the operators, in the execute method we can dynamically force no more retries.

In the following example:

  1. Sensor 0: will succeed on first try
  2. Sensor 1: will fail after 4 tries (1 + 3 max retries)
  3. Sensor 2: will fail after 1 try (forced no more retries dynamically)
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import BaseOperator


class PseudoSensor(BaseOperator):
    def __init__(
            self,
            s3_status_code_mock,
            *args,
            **kwargs):
        super().__init__(*args, **kwargs)
        self.s3_status_code_mock = s3_status_code_mock

    def execute(self, context):
        # Try to read S3, Redshift, blah blah
        pass
        # The query returned a status code, that we mock when the Sensor is initialized
        if self.s3_status_code_mock == 0:
            # Success
            return 0
        elif self.s3_status_code_mock == 1:
            # Error but should retry if I can still can
            raise Exception("Retryable error. Won't change retries of operator.")
        elif self.s3_status_code_mock == 2:
            # Unrecoverable error. Should fail without future retries.
            self.retries = 0
            raise Exception("Unrecoverable error. Will set retries to 0.")


# A separate function so we don't make the globals dirty
def createDAG():
    # Default (but overridable) arguments for Operators instantiations
    default_args = {
        'owner': 'Supay',
        'depends_on_past': False,
        'start_date': datetime(2019, 11, 28),
        'retry_delay': timedelta(seconds=1),
        'retries': 3,
    }

    with DAG("dynamic_retries_dag", default_args=default_args, schedule_interval=timedelta(days=1), catchup=False) as dag :
        # Sensor 0: should succeed in first try
        sensor_0 = PseudoSensor(
            task_id="sensor_0",
            provide_context=True,
            s3_status_code_mock=0,
        )

        # Sensor 1: should fail after 3 tries
        sensor_1 = PseudoSensor(
            task_id="sensor_1",
            provide_context=True,
            s3_status_code_mock=1
        )

        # Sensor 1: should fail after 1 try
        sensor_2 = PseudoSensor(
            task_id="sensor_2",
            provide_context=True,
            s3_status_code_mock=2
        )

        dag >> sensor_0
        dag >> sensor_1
        dag >> sensor_2

        globals()[dag.dag_id] = dag


# Run everything
createDAG()

Gantt showing the tries per task enter image description here

Upvotes: 5

SergiyKolesnikov
SergiyKolesnikov

Reputation: 7815

You can get the corresponding task instance from the context and redefine the number of retries for it, for example:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


default_args = {
    "owner": "Airflow",
    "start_date": datetime(2011, 1, 1, 1, 1),
}


def fun(*, task_instance, **context):
    task_instance.max_tries = 0  # reset retries to 0
    raise Exception()


with DAG("my_dag", default_args=default_args, catchup=False) as dag:
    op = PythonOperator(
        task_id="my_op",
        python_callable=fun,
        provide_context=True,
        retries=100000,  # set a lot of retries
        retry_delay=timedelta(seconds=1),
    )

I personally would not dynamically redefine the number of retries, because it changes the workflow behavior in a non-obvious way inside of an operator and, therefore, complicates the reasoning about the workflow. I would just let the task fail the set number of times independently of the failure reason. If the retires are expensive I then would reduce their numbers (e.g., to 1 or 0).

Upvotes: 2

Related Questions