Reputation: 26
We are seeing some strange behavior from the dask distributed scheduler.
With 200 workers, we distribute 1200 tasks that are essentially the same, these are long tasks that alternate between being CPU and IO bound. Each worker is assigned between 4-7 tasks.
The behavior we see is that each worker will alternate between spending time on each of it's tasks whenever an IO operation begins. This appears to be causing large amounts of memory to be stored on each worker, we suspect that this is causing dumps to hard drive, and slowing things down considerably.
In testing, we found that (using the same task as a benchmark), if we assigned ~2 tasks per worker, there would be no slowdown. But around ~5 tasks per worker there would be significant slowdown. The thing is, there is really very little time to gain from the worker switching from task to task.
How can we make each worker complete it's tasks sequentially? We want the workers to work on one task at a time to avoid this large RAM usage.
It feels like we could write a wrapper to feed n*2 tasks at a time (where n = number of workers) to the scheduler, but surely there is a way to configure this behavior?
Dask version:
0.19.1
Kubectl version:
Client Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.6", GitTreeState:"clean", BuildDate:"2018-03-21T15:21:50Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"linux/amd64"}
Server Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.2", GitCommit:"5fa2db2bd46ac79e5e00a4e6ed24191080aa463b", GitTreeState:"clean", BuildDate:"2018-01-18T09:42:01Z", GoVersion:"go1.9.2", Compiler:"gc", Platform:"linux/amd64"}
client.scheduler_info() output:
{'type': 'Scheduler',
'id': 'Scheduler-4b3d7cac-d536-4f66-b0bd-22d9ae19b260',
'address': 'tcp://192.168.152.162:8786',
'services': {'bokeh': 8787},
'workers': {'tcp://192.168.148.132:32860': {'type': 'Worker',
'id': 'tcp://192.168.148.132:32860',
'host': '192.168.148.132',
'resources': {},
'local_directory': '/dask-worker-space/worker-7nzwyqd6',
'name': 'tcp://192.168.148.132:32860',
'ncores': 1,
'memory_limit': 3500000000,
'last_seen': 1538522342.4690368,
'services': {'nanny': 44983},
'metrics': {'cpu': 2.0,
'memory': 42975232,
'time': 1538522342.0465984,
'read_bytes': 0.0,
'write_bytes': 0.0,
'num_fds': 25,
'executing': 0,
'in_memory': 0,
'ready': 0,
'in_flight': 0}},
'tcp://192.168.148.147:35760': {'type': 'Worker',
'id': 'tcp://192.168.148.147:35760',
'host': '192.168.148.147',
'resources': {},
'local_directory': '/dask-worker-space/worker-yuh3l9uh',
'name': 'tcp://192.168.148.147:35760',
'ncores': 1,
'memory_limit': 3500000000,
'last_seen': 1538522342.4663892,
'services': {'nanny': 38760},
'metrics': {'cpu': 2.0,
'memory': 42905600,
'time': 1538522342.0460682,
'read_bytes': 0.0,
'write_bytes': 0.0,
'num_fds': 25,
'executing': 0,
'in_memory': 0,
'ready': 0,
'in_flight': 0}},
(then there's a bunch more workers)
Let me know if there is any particular config info I can provide.
Upvotes: 1
Views: 158
Reputation: 57261
I would just give each worker a single thread
dask-worker ... --nthreads 1
You may want to look at the help string for dask-worker
dask-worker --help
Upvotes: 1