Reputation: 33
Hello,
I came across an dask.array
example, where computation time and required memory differ vastly between threaded (shared memory) schedulers (dask.get
, dask.threaded.get
) and schedulers with worker processes (dask.multiprocessing.get
, distributed.Client().get
)
I have tested this setup on a 2013 Core i7 16GB macOS Macbook Pro with SSD.
The simulation at hand realizes a meshgrid operation between a number of ndims
vectors of length dimlen
and then performs some simple elementwise operation on them, before the final result gets obtained by summing up in every dimension. Without dask
, the example would be such, that a copy of the meshed array would be as big as 8*(dimlen**ndims)/1024**3 = 7.4 GByte
. As we have ndim = 3
arguments, if all was done by plain copies of meshed arguments, we would need more than 16 GB RAM. (By the way: if the example was approached via numpy
, no full copies are created by numpy.broadcast_to
and numpy.transpose
anyhow. Only during creation of res
a full array of 7.4 GByte
would be allocated. )
So far I understand, that the slowdown in multiprocessing and distributed scheduler is due to massive RAM consumption and some tasks, which would write to disk (seen during distributed diagnostics webpage). However I cannot explain the behavior as my current understanding of the dask-graph to be calculated is:
The idea to use dask
is to reduce memory requirements here by chunking with dask.array
. The maximum amount of a copy of one meshed argument chunk-piece is 8*(chunklen**ndims)/1024**2 = 7.6 MByte
, assuming float64
. Also during the reduction operation (one dimension at a time) when chunk-pieces have to be concatenated, we should only end up with a maximum chunk-piece of 8*(dimlen/chunklen)*(chunklen**ndims)/1024**2 = 76 MByte
. Assuming we have 4 processes, still we only needed four times the mentioned chunk-sizes at any given time. However resource monitoring shows total combustion of 16GB RAM for those schedulers involving various processes.
I would appreciate a deeper explanation of what exactly I am missing out on here.
Thanks in advance, Markus
import numpy as np
import dask.array as da
from dask import get as single_threaded_get
from dask.threaded import get as threaded_get
from dask.multiprocessing import get as multiprocessing_get
ndims = 3
dimlen = 1000
chunklen = 100
# Some input data, which usually would come elsewhere
xs = [np.random.randn(dimlen) for _ in range(ndims)]
# Cast them to dask.array
ys = [da.from_array(x, chunks=chunklen) for x in xs]
# Meshgrid
zs = [da.broadcast_to(y, ndims*(dimlen,)) for y in ys]
zs = [da.rechunk(z, chunks=ndims*(chunklen,)) for z in zs]
_a = tuple(range(ndims))
zs = [da.transpose(z, axes=_a[i:] + _a[:i]) for i, z in enumerate(zs)]
# Some simple element-wise processing of n dimensional arguments
res = zs[0]
for z in zs[1:]:
res = res + z
# Some reduction of all dimensions to a scalar
for i in range(ndims):
res = da.sum(res, axis=-1)
res
dask.array<sum-aggregate, shape=(), dtype=float64, chunksize=()>
len(list(res.dask.keys()))
#12617
%%timeit -n 1 -r 1
x = res.compute(get=single_threaded_get)
#10.4 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -n 1 -r 1
x = res.compute(get=threaded_get)
#7.32 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -n 1 -r 1
x = res.compute(get=multiprocessing_get)
#5h 14min 52s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
from distributed import Client
client = Client()
%%timeit -n 1 -r 1
x = res.compute(get=client.get)
#7min 37s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Conda environment created with
$conda create -n py35 python=3.5 dask distributed jupyter
$source activate py35
$jupyter notebook .
!conda list -e
# This file may be used to create an environment using:
# $ conda create --name <env> --file <this file>
# platform: osx-64
appnope=0.1.0=py35_0
bleach=1.5.0=py35_0
bokeh=0.12.5=py35_1
chest=0.2.3=py35_0
click=6.7=py35_0
cloudpickle=0.2.2=py35_0
dask=0.14.3=py35_0
decorator=4.0.11=py35_0
distributed=1.16.3=py35_0
entrypoints=0.2.2=py35_1
heapdict=1.0.0=py35_1
html5lib=0.999=py35_0
icu=54.1=0
ipykernel=4.6.1=py35_0
ipython=6.0.0=py35_1
ipython_genutils=0.2.0=py35_0
ipywidgets=6.0.0=py35_0
jedi=0.10.2=py35_2
jinja2=2.9.6=py35_0
jsonschema=2.6.0=py35_0
jupyter=1.0.0=py35_3
jupyter_client=5.0.1=py35_0
jupyter_console=5.1.0=py35_0
jupyter_core=4.3.0=py35_0
locket=0.2.0=py35_1
markupsafe=0.23=py35_2
mistune=0.7.4=py35_0
mkl=2017.0.1=0
msgpack-python=0.4.8=py35_0
nbconvert=5.1.1=py35_0
nbformat=4.3.0=py35_0
notebook=5.0.0=py35_0
numpy=1.12.1=py35_0
openssl=1.0.2k=2
pandas=0.20.1=np112py35_0
pandocfilters=1.4.1=py35_0
partd=0.3.8=py35_0
path.py=10.3.1=py35_0
pexpect=4.2.1=py35_0
pickleshare=0.7.4=py35_0
pip=9.0.1=py35_1
prompt_toolkit=1.0.14=py35_0
psutil=5.2.2=py35_0
ptyprocess=0.5.1=py35_0
pygments=2.2.0=py35_0
pyqt=5.6.0=py35_2
python=3.5.3=1
python-dateutil=2.6.0=py35_0
pytz=2017.2=py35_0
pyyaml=3.12=py35_0
pyzmq=16.0.2=py35_0
qt=5.6.2=2
qtconsole=4.3.0=py35_0
readline=6.2=2
requests=2.14.2=py35_0
setuptools=27.2.0=py35_0
simplegeneric=0.8.1=py35_1
sip=4.18=py35_0
six=1.10.0=py35_0
sortedcollections=0.5.3=py35_0
sortedcontainers=1.5.7=py35_0
sqlite=3.13.0=0
tblib=1.3.2=py35_0
terminado=0.6=py35_0
testpath=0.3=py35_0
tk=8.5.18=0
toolz=0.8.2=py35_0
tornado=4.5.1=py35_0
traitlets=4.3.2=py35_0
wcwidth=0.1.7=py35_0
wheel=0.29.0=py35_0
widgetsnbextension=2.0.0=py35_0
xz=5.2.2=1
yaml=0.1.6=0
zict=0.1.2=py35_0
zlib=1.2.8=3
Hello @kakk11,
thank you very much for your answer and your effort. I've made further investigations on it and if I increase your example to my problem size, i.e. ndims = 3
, dimlen = 1000
, chunklen = 100
, I get the following behavior, where again the multiprocessing scheduler requires much longer for the "slow solution". However bear in mind, that I need the structure of the slow solution, due to the real application.
%%timeit -r 1 -n 1
fast_solution(x).compute(get=single_threaded_get)
# 2min 4s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -r 1 -n 1
fast_solution(x).compute(get=threaded_get)
# 49.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -r 1 -n 1
fast_solution(x).compute(get=multiprocessing_get)
# 55.4 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -r 1 -n 1
slow_solution(x).compute(get=single_threaded_get)
# 2min 21s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -r 1 -n 1
slow_solution(x).compute(get=threaded_get)
# 56.6 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -r 1 -n 1
slow_solution(x).compute(get=multiprocessing_get)
# 10min 31s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Also you can try the following code to see the difference in the dask graph of the two versions. fast_solution
contains only parallel paths until the very end, while slow_solution
has a more pyramidal structure. However given my thought on the individual chunk sizes passed along I don't get what the problem is.
ndims = 2
dimlen = 1000
chunklen = 500
# ...
from dask.dot import dot_graph
dot_graph(fast_solution(x).dask)
dot_graph(slow_solution(x).dask)
Hello @MRocklin,
thanks for your answer. I understand, that some calculation graphs have high data interchange cost and the the example at hand belongs to this category. I further simplified it, leading to a meshgrid equivalent operation for two dimensions followed by a total reduction of dimensions to a scalar. Maybe this can serve as example to give more insight into the questions if
distributed
behavior is normal, and the example has high data interchange costdistributed
scheduler. Please be advices, that the example should only be used to profile the memory usage, not the CPU performance. It doesn't produce enough workload for the CPU, as it is a simplified example.
I want to emphasize that the reason for me following this issue up, is that I have the vision, that dask/distributed should be able to deal with graph structures, that have intermediate total array sizes well beyond accumulated RAM of workers, iff the chunksizes and local graph structure is well below a workers RAM capacity. The example realizes such a graph structure.
So here is the updated jupyter notebook: dask_distributed_RAM_usage_meshgrid_operation.ipynb.zip
Looking forward to more discussions on the topic and am grateful for your effort spend on this issue.
Markus
Upvotes: 2
Views: 2203
Reputation: 57271
I suspect that your computation forces a great deal of data interchange, which is free if you are in the same process but possibly expensive if you want to use different processes. This introduces two costs:
The dask schedulers generally try to compute tasks that allow it to clean up intermediate results, but this isn't always possible. When I run your computation on my machine using the distributed schedulers web diagnostic dashboard I see that most of the time is spent in inter-process communication and in spilling data to disk and reading it back.
I haven't looked deeply enough at your problem to determine if this is intrinsic to your problem or a flaw in how dask schedules things. If you are able to simplify your computation further while still showing the same performance flaw then that would make it easier to diagnose.
Upvotes: 0
Reputation: 918
Thanks for interesting test. Looks like multiprocessing_get
is having problems with sum over list and I can only guess, why. multiprocessing is by default used in dask.Bag, which is the use case of python objects, not arrays, and is not performing fast when interprocess communication is needed.
Anyway, when you use dask functions for all steps in computation, it actually works fast in all cases, see my example
import dask.array as da
from dask.multiprocessing import get as multiprocessing_get
import time
t0 = time.time()
ndims = 3
dimlen = 400
chunklen = 100
x = [da.random.normal(10, 0.1, size=ndims*(dimlen,), chunks=ndims*(chunklen,)) for k in range(ndims)]
def slow_solution(x):
res = x[0]
for z in x[1:]:
res = res + z
return da.sum(res)
def fast_solution(x):
return da.sum(da.stack(x))
t1 = time.time()
print("start fast f-n")
fast_solution(x).compute(get=multiprocessing_get)
t2 = time.time()
print("start slow f-n")
slow_solution(x).compute(get=multiprocessing_get)
t3 = time.time()
print("Whole script: ", t3 - t0)
print("fast function: ", t2 - t1)
print("slow function: ", t3 - t2)
Update
Is there a particular reason why you need to use multiprocessing_get
and threaded
does not work for you, or you are just curious? The documentation of dask is not exactly comprehensive, but from what I get, the multiprocessing
solution is generally used for dask Bag, which is a more general solution for any kind of python objects. And there are known limitations for it's performance, see http://dask.pydata.org/en/latest/shared.html#known-limitations
Upvotes: 1