Reputation: 3096
My code looks something like this
def myfunc(param):
# expensive stuff that takes 2-3h
mylist = [...]
client = Client(...)
mgr = DeploymentMgr()
# ... setup stateful set ...
futures = client.map(myfunc, mylist, ..., resources={mgr.hash.upper(): 1})
client.gather(futures)
I have dask running on a Kubernetes cluster. At the start of the program I create a stateful set. This is done via kubernetes.client.AppsV1Api()
. Then I wait for up to 30 minutes until all workers that I have requested are available. For this example, say I request 10 workers, but after 30 minutes, only 7 workers are available. Finally, I call client.map()
and pass a function and a list to it. This list has 10 elements. However, dask will only use 7 workers to process this list! Even if after a couple of minutes the remaining 3 workers are available, dask does not assign any list elements to them even if none of the processing of the first elements has finished.
How can I change that behaviour of dask? Is there a a way of telling dask (or the scheduler of dask) to periodically check for newly arriving workers and distribute work more "correctly"? Or can I manually influence the distribution of these list elements?
Thank you.
Upvotes: 4
Views: 347
Reputation: 57251
Dask will balance loads once it has a better understanding of how long the tasks will take. You can give an estimate of task length with the configuration values
distributed:
scheduler:
default-task-durations:
myfunc: 1hr
Or, once Dask finishes one of these tasks, it will know how to make decisions around that task in the future.
I believe that this has also come up a couple of times on the GitHub issue tracker. You may want to search through https://github.com/dask/distributed/issues for more information.
Upvotes: 4