Reputation: 2207
The airflow scheduler crashes when I trigger it manually from the dashboard.
executor = DaskExecutor
Airflow version: = 1.10.7
sql_alchemy_conn = postgresql://airflow:airflow@localhost:5432/airflow
python version = 3.6
The logs on crash are:
[2020-08-20 07:01:49,288] {scheduler_job.py:1148} INFO - Sending ('hello_world', 'dummy_task', datetime.datetime(2020, 8, 20, 1, 31, 47, 20630, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default
[2020-08-20 07:01:49,288] {base_executor.py:58} INFO - Adding to queue: ['airflow', 'run', 'hello_world', 'dummy_task', '2020-08-20T01:31:47.020630+00:00', '--local', '--pool', 'default_pool', '-sd', '/workflows/dags/helloWorld.py']
/mypython/lib/python3.6/site-packages/airflow/executors/dask_executor.py:63: UserWarning: DaskExecutor does not support queues. All tasks will be run in the same cluster
'DaskExecutor does not support queues. '
distributed.protocol.pickle - INFO - Failed to serialize <function DaskExecutor.execute_async.<locals>.airflow_run at 0x12057a9d8>. Exception: Cell is empty
[2020-08-20 07:01:49,292] {scheduler_job.py:1361} ERROR - Exception when executing execute_helper
Traceback (most recent call last):
File "/mypython/lib/python3.6/site-packages/distributed/worker.py", line 843, in dumps_function
result = cache[func]
KeyError: <function DaskExecutor.execute_async.<locals>.airflow_run at 0x12057a9d8>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mypython/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 38, in dumps
result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
AttributeError: Can't pickle local object 'DaskExecutor.execute_async.<locals>.airflow_run'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mypython/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1359, in _execute
self._execute_helper()
File "/mypython/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1420, in _execute_helper
if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
File "/mypython/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1482, in _validate_and_run_task_instances
self.executor.heartbeat()
File "/mypython/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 130, in heartbeat
self.trigger_tasks(open_slots)
File "/mypython/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 154, in trigger_tasks
executor_config=simple_ti.executor_config)
File "/mypython/lib/python3.6/site-packages/airflow/executors/dask_executor.py", line 70, in execute_async
future = self.client.submit(airflow_run, pure=False)
File "/mypython/lib/python3.6/site-packages/distributed/client.py", line 1279, in submit
actors=actor)
File "/mypython/lib/python3.6/site-packages/distributed/client.py", line 2249, in _graph_to_futures
'tasks': valmap(dumps_task, dsk3),
File "/mypython/lib/python3.6/site-packages/toolz/dicttoolz.py", line 83, in valmap
rv.update(zip(iterkeys(d), map(func, itervalues(d))))
File "/mypython/lib/python3.6/site-packages/distributed/worker.py", line 881, in dumps_task
return {'function': dumps_function(task[0]),
File "/mypython/lib/python3.6/site-packages/distributed/worker.py", line 845, in dumps_function
result = pickle.dumps(func)
File "/mypython/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 51, in dumps
return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
File "/mypython/lib/python3.6/site-packages/cloudpickle/cloudpickle_fast.py", line 101, in dumps
cp.dump(obj)
File "/mypython/lib/python3.6/site-packages/cloudpickle/cloudpickle_fast.py", line 540, in dump
return Pickler.dump(self, obj)
File "/usr/local/opt/[email protected]/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 409, in dump
self.save(obj)
File "/usr/local/opt/[email protected]/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/mypython/lib/python3.6/site-packages/cloudpickle/cloudpickle_fast.py", line 722, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File "/mypython/lib/python3.6/site-packages/cloudpickle/cloudpickle_fast.py", line 659, in _save_reduce_pickle5
dictitems=dictitems, obj=obj
File "/usr/local/opt/[email protected]/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 610, in save_reduce
save(args)
File "/usr/local/opt/[email protected]/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/opt/[email protected]/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 751, in save_tuple
save(element)
File "/usr/local/opt/[email protected]/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/opt/[email protected]/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 736, in save_tuple
save(element)
File "/usr/local/opt/[email protected]/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/mypython/lib/python3.6/site-packages/dill/_dill.py", line 1146, in save_cell
f = obj.cell_contents
ValueError: Cell is empty
[2020-08-20 07:01:49,302] {helpers.py:322} INFO - Sending Signals.SIGTERM to GPID 11451
[2020-08-20 07:01:49,303] {dag_processing.py:804} INFO - Exiting gracefully upon receiving signal 15
[2020-08-20 07:01:49,310] {dag_processing.py:1379} INFO - Waiting up to 5 seconds for processes to exit...
[2020-08-20 07:01:49,318] {helpers.py:288} INFO - Process psutil.Process(pid=11451, status='terminated') (11451) terminated with exit code 0
[2020-08-20 07:01:49,319] {helpers.py:288} INFO - Process psutil.Process(pid=11600, status='terminated') (11600) terminated with exit code None
[2020-08-20 07:01:49,319] {scheduler_job.py:1364} INFO - Exited execute loop
I am running it on macOS Catalina, if that might help to isolate the error.
Upvotes: 2
Views: 2227
Reputation: 480
This started happening with new versions in downstream Dask dependencies. Locking the versions fixes the issue.
pip uninstall cloudpickle distributed
pip install cloudpickle==1.4.1 distributed==2.17.0
These were the problematic versions:
cloudpickle==1.6.0
distributed==2.26.0
I run Airflow 1.10.10
in docker and use the same image for Dask 2.13.0
.
Upvotes: 1
Reputation: 1976
I believe this issue is possibly what you are experiencing.
Looking at that ticket, it appears to still be open as a fix has been made, but has not yet made it to an official release.
This pull request contains the fix for the issue linked above - you could try building your Airflow stack locally from there, and see if it resolves the issue for you.
Upvotes: 1