Reputation: 3892
After using the dask multiprocessing scheduler for a long period of time, I noticed that the python processes started by the multiprocessing scheduler take a lot of memory. How can I restart the worker pool?
Upvotes: 3
Views: 2306
Reputation: 6710
Update: You can do this to kill the workers started by the multiprocessing scheduler:
from dask.context import _globals pool = _globals.pop('pool') # remove the pool from globals to make dask create a new one pool.close() pool.terminate() pool.join()
First answer:
For tasks that consume a lot of memory, I prefer to use the distributed
scheduler even in localhost.
It's very straightforward:
$ dask-scheduler distributed.scheduler - INFO - ----------------------------------------------- distributed.scheduler - INFO - Scheduler at: 1.2.3.4:8786 distributed.scheduler - INFO - http at: 1.2.3.4:9786 distributed.bokeh.application - INFO - Web UI: http://1.2.3.4:8787/status/ distributed.scheduler - INFO - ----------------------------------------------- distributed.core - INFO - Connection from 1.2.3.4:56240 to Scheduler distributed.core - INFO - Connection from 1.2.3.4:56241 to Scheduler distributed.core - INFO - Connection from 1.2.3.4:56242 to Scheduler
$ dask-worker --nprocs 8 --nthreads 1 --memory-limit .8 1.2.3.4:8786 distributed.nanny - INFO - Start Nanny at: 127.0.0.1:61760 distributed.nanny - INFO - Start Nanny at: 127.0.0.1:61761 distributed.nanny - INFO - Start Nanny at: 127.0.0.1:61762 distributed.nanny - INFO - Start Nanny at: 127.0.0.1:61763 distributed.worker - INFO - Start worker at: 127.0.0.1:61765 distributed.worker - INFO - nanny at: 127.0.0.1:61760 distributed.worker - INFO - http at: 127.0.0.1:61764 distributed.worker - INFO - Waiting to connect to: 127.0.0.1:8786 distributed.worker - INFO - ------------------------------------------------- distributed.worker - INFO - Threads: 1 distributed.nanny - INFO - Start Nanny at: 127.0.0.1:61767 distributed.worker - INFO - Memory: 1.72 GB distributed.worker - INFO - Local Directory: /var/folders/55/nbg15c6j4k3cg06tjfhqypd40000gn/T/nanny-11ygswb9 ...
distributed.Client
class to submit your jobs.In [1]: from distributed import Client In [2]: client = Client('1.2.3.4:8786') In [3]: client <Client: scheduler="127.0.0.1:61829" processes=8 cores=8> In [4]: from distributed.diagnostics import progress In [5]: import dask.bag In [6]: data = dask.bag.range(10000, 8) In [7]: data dask.bag In [8]: future = client.compute(data.sum()) In [9]: progress(future) [########################################] | 100% Completed | 0.0s In [10]: future.result() 49995000
I found out this way more reliable than the default scheduler. I prefer explicitly submit the task and handle the future to use the progress widget, which is really nice in a notebook. Also you can still do stuff while waiting the results.
If you get errors due to memory issues, you can restart the workers or the scheduler (start all over again), use smaller chunks of data and try again.
Upvotes: 2