Reputation: 11607
LocalExecutor
since all my tasks execute some commands / on remote machines (no actual computation heavy-lifting done within same Airflow machine)After several days of rewrite I was able to generate my DAGs in the said way (programmatically)
However upon trying to trigger the DAG via UI (while testing, I set schedule_interval=None
so triggers are manual), the DAG explodes (nuclear bomb text image) with following stacktrace (see complete stacktrace here) likely thrown from this place
recurse_nodes(t, visited) for t in task.upstream_list
File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/www/views.py", line 1483, in <listcomp>
if node_count[0] < node_limit or t not in visited]
File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/www/views.py", line 1478, in recurse_nodes
visited.add(task)
File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2304, in __hash__
hash(val)
File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2304, in __hash__
hash(val)
File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2304, in __hash__
hash(val)
[Previous line repeated 477 more times]
File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2302, in __hash__
val = getattr(self, c, None)
File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2396, in dag_id
if self.has_dag():
File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2392, in has_dag
return getattr(self, '_dag', None) is not None
RecursionError: maximum recursion depth exceeded while calling a Python object
Additionally, when I reload the webUI (or open a different browser tab), I can see the DAG stuck in RUNNING
state (all TaskInstance
s have state
/ PID
= NULL
).
Interestingly enough, once this happens, until I mark the DAG as failed via UI, I start getting the following stacktrace at /logs/scheduler/latest/<path/to/my/dag_script.py.log>
(apparently thrown from here)
[2019-12-24 06:27:32,851] {jobs.py:1446} INFO - Processing derived_tables_presto_cbot_events_1
[2019-12-24 06:27:32,855] {jobs.py:921} INFO - Examining DAG run <DagRun derived_tables_presto_cbot_events_1 @ 2019-12-23 13:08:36.747641+00:00: manual__2019-12-23T13:08:36.747641+00:00, externally triggered: True>
[2019-12-24 06:27:32,856] {jobs.py:410} ERROR - Got an exception! Propagating...
Traceback (most recent call last):
File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/jobs.py", line 402, in helper
pickle_dags)
File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
return func(*args, **kwargs)
File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/jobs.py", line 1760, in process_file
self._process_dags(dagbag, dags, ti_keys_to_schedule)
File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/jobs.py", line 1451, in _process_dags
self._process_task_instances(dag, tis_out)
File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
return func(*args, **kwargs)
File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/jobs.py", line 930, in _process_task_instances
if len(active_dag_runs) >= dag.max_active_runs:
TypeError: '>=' not supported between instances of 'int' and 'NoneType'
Once I mark the Dag as failed via WebUI, the above stacktrace from scheduler logs disappears (the TaskInstance
s of that DAG continue to exist in Airflow's meta-db unless I manually delete them using a DELETE FROM..
SQL query)
I've tried several things without success
LocalExecutor
deployment is still intact (older workflow's DAGs are still running fine)scheduler
& webserver
several timesairflow initdb
airflow.cfg
file for any discrepanciesI'm using
Python
3.7.3
(installed via PyEnv
)Airflow 1.10.3
with LocalExecutor
Linux ip-XXX-XX-XX-XX 4.9.0-8-amd64 #1 SMP Debian 4.9.130-2 (2018-10-27) x86_64 GNU/Linux
Upvotes: 0
Views: 6906
Reputation: 11607
As rightly pointed out by @kaxil, the bug was in one of the custom operators
I had carelessly used a method defined within operator's class as on_failure_callback
, and therefore the self
param of that kill_task()
method was leading to RecursionError
and exploding of DAG
class MyCustomOperator(BaseOperator):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.on_failure_callback = self.kill_task
..
def kill_task(self) -> None:
// do some cleanup work
Moving the kill_task()
method outside the class resolved the issue
Upvotes: 1