Devashish Patil
Devashish Patil

Reputation: 46

Airflow Sensors failing after getting UP_FOR_RESCHEDULE

We have a bunch of Sensor tasks running in reschedule mode with the default poke_interval of 60 seconds. These tasks run for some time perfectly fine but sometimes fails and the last log I can see is that they are UP_FOR_RESCHEDULE.

...
{taskinstance.py:1464} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
{local_task_job.py:151} INFO - Task exited with return code 0

Ideally, the task status should be UP_FOR_RESCHEDULE, but it becomes failed and even after configuring retries, it doesn't retry again.

The corresponding scheduler logs for this looks like this:

{scheduler_job.py:1241} ERROR - Executor reports task instance <TaskInstance: DAG_ID.TASK_ID 2022-01-10 04:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?

Setup: Airflow Version: 2.1.0 Database: postgres9.6 Using CeleryExecuter and Redis and hosted on Kubernetes. Also, using pgbouncer for connection pooling.

Would really appreciate some help on this one. Thank you

Upvotes: 1

Views: 3119

Answers (2)

Viv
Viv

Reputation: 46

The issue was fixed in the 2.3.1 release (see "Add reschedule to the serialized fields for the BaseSensorOperator (#23674)". So any upgrade to airflow ^2.3.1 will fix the issue.

Carl M's answer worked for me and led me to find what changed.

Upvotes: 0

Carl M
Carl M

Reputation: 189

I am also experiencing this issue. It started after I upgraded from Airflow 2.2.2 to 2.3.0. This custom class derives from HttpSensor.

check_stuff = AuthorizedHttpSensor(
    task_id="check_stuff_sensor",
    poke_interval=60 * 10,
    timeout=60 * 60 * 7,
    mode="reschedule",
    retries=50,
    soft_fail=False,
    authorize_conn_id = authorize_conn_id,
    endpoint= "stuff",
    http_conn_id = "stuff_conn_id",
)

Results in the following error:

[2022-06-14, 02:20:35] {taskinstance.py:1853} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2022-06-14, 02:20:35] {local_task_job.py:156} INFO - Task exited with return code 0

I have the same issue for all kind of sensors after updating to Airflow 2.3.0. Our temporary solution is to change "reschedule" to "poke" but as you know, it occupies a pool for a long time if the sensor does not succeed. I will post a real fix as soon as I find something other than the temporary fix :)

EDIT: Updating to Airflow 2.3.2 worked for me!

Upvotes: 2

Related Questions