Reputation: 393
a dot product in dask seems to run much slower than in numpy:
import numpy as np
x_np = np.random.normal(10, 0.1, size=(1000,100))
y_np = x_np.transpose()
%timeit x_np.dot(y_np)
# 100 loops, best of 3: 7.17 ms per loop
import dask.array as da
x_dask = da.random.normal(10, 0.1, size=(1000,100), chunks=(5,5))
y_dask = x_dask.transpose()
%timeit x_dask.dot(y_dask)
# 1 loops, best of 3: 6.56 s per loop
Does anybody know what might be the reason for that? Is there anything I'm missing here?
Upvotes: 4
Views: 2272
Reputation: 393
The calculation of the dot product in dask runs much faster when adujusting the chunks:
import dask.array as da
x_dask = da.random.normal(10, 0.1, size=(1000,100), chunks=1000)
y_dask = x_dask.transpose()
%timeit x_dask.dot(y_dask)
# 1000 loops, best of 3: 330 µs per loop
more about chunks in the dask docs.
edit:
As @MRocklin wrote, to really get the computation time, one must call .compute()
on the function.
Upvotes: 0
Reputation: 57251
The answer by @isternberg is correct that you should adjust chunk sizes. A good choice of chunk size follows the following rules
I generally shoot for chunks that are 1-100 megabytes large. Anything smaller than that isn't helpful and usually creates enough tasks that scheduling overhead becomes our largest bottleneck.
If your array is only of size (1000, 100)
then there is no reason to use dask.array
. Instead, use numpy and, if you really care about using mulitple cores, make sure that your numpy library is linked against an efficient BLAS implementation like MLK or OpenBLAS.
If you use a multi-threaded BLAS implementation you might actually want to turn dask threading off. The two systems will clobber each other and reduce performance. If this is the case then you can turn off dask threading with the following command.
dask.set_options(get=dask.async.get_sync)
To actually time the execution of a dask.array computation you'll have to add a .compute()
call to the end of the computation, otherwise you're just timing how long it takes to create the task graph, not to execute it.
In [1]: import dask.array as da
In [2]: x = da.random.normal(10, 0.1, size=(2000, 100000), chunks=(1000, 1000)) # larger example
In [3]: %time z = x.dot(x.T) # create task graph
CPU times: user 12 ms, sys: 3.57 ms, total: 15.6 ms
Wall time: 15.3 ms
In [4]: %time _ = z.compute() # actually do work
CPU times: user 2min 41s, sys: 841 ms, total: 2min 42s
Wall time: 21 s
Upvotes: 5