Reputation: 8538
I'm trying to make use of dask on a cluster and I'm interested in terminating all the workers as soon as all the jobs are done. I was trying to do that with the retire_workers method, but that doesn't seem to kill the workers. Here is an example.
import time
import os
from dask.distributed import Client
def long_func(x):
time.sleep(2)
return 1
if __name__ == '__main__':
C = Client(scheduler_file='sched.json')
res = []
for _ in range(10):
res.append(C.submit(long_func, _))
for r in res:
r.result()
workers = list(C.scheduler_info()['workers'])
# C.run(lambda: os._exit(0), workers=workers)
C.retire_workers(workers=workers, close_workers=True)
The scheduler and a worker were started with these commands:
dask-scheduler --scheduler-file sched.json
dask-worker --scheduler-file sched.json --nthreads=1 --lifetime='5minutes'
The hope was that after executing the python code above, the worker would terminate (after 20 seconds), but it does not, staying for the whole 5 minutes. Any advice how to fix that ?
Upvotes: 1
Views: 1603
Reputation: 372
I'd recommend using a context manager to manage the cluster - its nice and clean. I've had issues when it comes to RAM memory getting maxed out and stalling my computer when working locally, but here is an example of what I frequently use:
# start our Dask cluster
from dask.distributed import Client,LocalCluster
if __name__ == '__main__':
cluster = LocalCluster()
with Client(cluster) as client:
print("scheduler host: ", client.scheduler.address)
# do some stuff
Upvotes: 2
Reputation: 16551
This will shutdown the connected scheduler and retire the workers:
C.shutdown()
Upvotes: 2