y2k-shubham
y2k-shubham

Reputation: 11607

Airflow DAG explodes with RecursionError when triggered via WebUI

After several days of rewrite I was able to generate my DAGs in the said way (programmatically)


However upon trying to trigger the DAG via UI (while testing, I set schedule_interval=None so triggers are manual), the DAG explodes (nuclear bomb text image) with following stacktrace (see complete stacktrace here) likely thrown from this place

  recurse_nodes(t, visited) for t in task.upstream_list
  File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/www/views.py", line 1483, in <listcomp>
    if node_count[0] < node_limit or t not in visited]
  File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/www/views.py", line 1478, in recurse_nodes
    visited.add(task)
  File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2304, in __hash__
    hash(val)
  File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2304, in __hash__
    hash(val)
  File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2304, in __hash__
    hash(val)
  [Previous line repeated 477 more times]
  File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2302, in __hash__
    val = getattr(self, c, None)
  File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2396, in dag_id
    if self.has_dag():
  File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2392, in has_dag
    return getattr(self, '_dag', None) is not None
RecursionError: maximum recursion depth exceeded while calling a Python object

Additionally, when I reload the webUI (or open a different browser tab), I can see the DAG stuck in RUNNING state (all TaskInstances have state / PID = NULL).

enter image description here enter image description here


Interestingly enough, once this happens, until I mark the DAG as failed via UI, I start getting the following stacktrace at /logs/scheduler/latest/<path/to/my/dag_script.py.log> (apparently thrown from here)

enter image description here

[2019-12-24 06:27:32,851] {jobs.py:1446} INFO - Processing derived_tables_presto_cbot_events_1
[2019-12-24 06:27:32,855] {jobs.py:921} INFO - Examining DAG run <DagRun derived_tables_presto_cbot_events_1 @ 2019-12-23 13:08:36.747641+00:00: manual__2019-12-23T13:08:36.747641+00:00, externally triggered: True>
[2019-12-24 06:27:32,856] {jobs.py:410} ERROR - Got an exception! Propagating...
Traceback (most recent call last):
  File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/jobs.py", line 402, in helper
    pickle_dags)
  File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/jobs.py", line 1760, in process_file
    self._process_dags(dagbag, dags, ti_keys_to_schedule)
  File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/jobs.py", line 1451, in _process_dags
    self._process_task_instances(dag, tis_out)
  File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/jobs.py", line 930, in _process_task_instances
    if len(active_dag_runs) >= dag.max_active_runs:
TypeError: '>=' not supported between instances of 'int' and 'NoneType'

Once I mark the Dag as failed via WebUI, the above stacktrace from scheduler logs disappears (the TaskInstances of that DAG continue to exist in Airflow's meta-db unless I manually delete them using a DELETE FROM.. SQL query)

enter image description here enter image description here enter image description here


I've tried several things without success


I'm using

Upvotes: 0

Views: 6906

Answers (1)

y2k-shubham
y2k-shubham

Reputation: 11607

As rightly pointed out by @kaxil, the bug was in one of the custom operators


I had carelessly used a method defined within operator's class as on_failure_callback, and therefore the self param of that kill_task() method was leading to RecursionError and exploding of DAG

class MyCustomOperator(BaseOperator):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.on_failure_callback = self.kill_task
    ..
    def kill_task(self) -> None:
        // do some cleanup work

Moving the kill_task() method outside the class resolved the issue

Upvotes: 1

Related Questions