Reputation: 105
I want configure Dask for distribute DAG's in Airflow. I have read https://airflow.apache.org/howto/executor/use-dask.html and https://distributed.readthedocs.io/en/latest/, but I don't understand how it works. I have two servers of Apache Aiflow, where to run dask-shedulers and dask-workers, that at falling of the first server everything automatically worked at the second? As I understand it Airflow put tasks in the dask-sheduler. I don't undestand how to make friends of the dask-shedulers on two servers. I don't understand why need dask-workers and what it do, I think this is some kind of unnecessary component.
I don't want use CeleryExecutor and configure RabbitMQ or Redis for Celery
Upvotes: 2
Views: 1988
Reputation: 23
I figured this one out after a lot of research. It turns out to be an issue with Dask Executor (https://issues.apache.org/jira/browse/AIRFLOW-4494). I applied the fix on the docker image I use to run airflow and fixed it! The fix is scheduled for the next release.
RUN cd /usr/local/lib/python3.6/site-packages/airflow/executors && \
sed -i "s@return subprocess.check_call(command, shell=True, close_fds=True)@return subprocess.check_call(command, close_fds=True)@g" dask_executor.py
Upvotes: 0
Reputation: 21
You can find two better descriptions to get Airflow + Dask up and running here: https://www.alibabacloud.com/blog/schedule-data-lake-analytics-tasks-by-using-airflow_594183 or https://tech.marksblogg.com/install-and-configure-apache-airflow.html. In particular, the first link helped me a lot. And, running Airflow scheduler + webserver + dask-scheduler is fine. However, as soon as I start Airflow worker + dask-worker, the Airflow worker exits and complains about not having Celery activated:
ModuleNotFoundError: No module named 'celery'
And when I run dask-worker without Airflow worker, everything seems to work fine until I trigger a DAG:
worker_1 | [2019-05-12 20:47:05,527] {__init__.py:51} INFO - Using executor DaskExecutor
worker_1 | usage: airflow [-h]
worker_1 | {backfill,list_dag_runs,list_tasks,clear,pause,unpause,trigger_dag,delete_dag,pool,variables,kerberos,render,run,initdb,list_dags,dag_state,task_failed_deps,task_state,serve_logs,test,webserver,resetdb,upgradedb,scheduler,worker,flower,version,connections,create_user,delete_user,list_users,sync_perm,next_execution,rotate_fernet_key}
worker_1 | ...
worker_1 | airflow: error: the following arguments are required: subcommand
worker_1 | distributed.worker - WARNING - Compute Failed
worker_1 | Function: airflow_run
worker_1 | args: ()
worker_1 | kwargs: {}
worker_1 | Exception: CalledProcessError(2, ['airflow', 'run', 'example_python_operator', 'print_the_context', '2019-05-12T20:47:02.111022+00:00', '--pickle', '13', '--local', '-sd', '/opt/airflow/dags/python_exec.py'])
worker_1 |
webserver_1 | [2019-05-12 20:47:06 +0000] [37] [INFO] Handling signal: ttin
webserver_1 | [2019-05-12 20:47:06 +0000] [744] [INFO] Booting worker with pid: 744
webserver_1 | [2019-05-12 20:47:06,299] {dask_executor.py:77} ERROR - Failed to execute task: CalledProcessError(2, ['airflow', 'run', 'example_python_operator', 'print_the_context', '2019-05-12T20:47:02.111022+00:00', '--pickle', '13', '--local', '-sd', '/opt/airflow/dags/python_exec.py'])
Any hint to fix this?
Upvotes: 2
Reputation: 73
You run the scheduler on one server, not two. I have on one machine just airflow scheduler and dask scheduler. In the airflow config for dask scheduler I have localhost:8786. Then on the other machines you start dask worker and give it the ip address and port of your scheduler. Submit a task via airflow and the workers will pick it up if you did it right.
Upvotes: 0