Reputation: 46
We have a bunch of Sensor tasks running in reschedule mode with the default poke_interval of 60 seconds. These tasks run for some time perfectly fine but sometimes fails and the last log I can see is that they are UP_FOR_RESCHEDULE.
...
{taskinstance.py:1464} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
{local_task_job.py:151} INFO - Task exited with return code 0
Ideally, the task status should be UP_FOR_RESCHEDULE, but it becomes failed and even after configuring retries, it doesn't retry again.
The corresponding scheduler logs for this looks like this:
{scheduler_job.py:1241} ERROR - Executor reports task instance <TaskInstance: DAG_ID.TASK_ID 2022-01-10 04:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
Setup: Airflow Version: 2.1.0 Database: postgres9.6 Using CeleryExecuter and Redis and hosted on Kubernetes. Also, using pgbouncer for connection pooling.
Would really appreciate some help on this one. Thank you
Upvotes: 1
Views: 3119
Reputation: 46
The issue was fixed in the 2.3.1 release (see "Add reschedule to the serialized fields for the BaseSensorOperator (#23674)". So any upgrade to airflow ^2.3.1 will fix the issue.
Carl M's answer worked for me and led me to find what changed.
Upvotes: 0
Reputation: 189
I am also experiencing this issue. It started after I upgraded from Airflow 2.2.2 to 2.3.0. This custom class derives from HttpSensor.
check_stuff = AuthorizedHttpSensor(
task_id="check_stuff_sensor",
poke_interval=60 * 10,
timeout=60 * 60 * 7,
mode="reschedule",
retries=50,
soft_fail=False,
authorize_conn_id = authorize_conn_id,
endpoint= "stuff",
http_conn_id = "stuff_conn_id",
)
Results in the following error:
[2022-06-14, 02:20:35] {taskinstance.py:1853} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2022-06-14, 02:20:35] {local_task_job.py:156} INFO - Task exited with return code 0
I have the same issue for all kind of sensors after updating to Airflow 2.3.0. Our temporary solution is to change "reschedule" to "poke" but as you know, it occupies a pool for a long time if the sensor does not succeed. I will post a real fix as soon as I find something other than the temporary fix :)
EDIT: Updating to Airflow 2.3.2 worked for me!
Upvotes: 2