SednaGammaPrime
SednaGammaPrime

Reputation: 21

Dask delayed performance issues

I'm relatively new to Dask. I'm trying to parallelize a "custom" function that doesn't use Dask containers. I would just like to speed up the computation. But my results are that when I try parallelizing with dask.delayed, it has significantly worse performance than running the serial version. Here is a minimal implementation demonstrating the issue (the code I actually want to do this with is significantly more involved :) )

import dask,time

def mysum(rng):
    # CPU intensive
    z = 0
    for i in rng:
        z += i
    return z

# serial
b = time.time(); zz = mysum(range(1, 1_000_000_000)); t = time.time() - b
print(f'time to run in serial {t}')
# parallel
ms_parallel = dask.delayed(mysum)
ss = []
ncores = 10
m = 100_000_000
for i in range(ncores):
    lower = m*i
    upper = (i+1) * m
    r = range(lower, upper)
    s = ms_parallel(r)
    ss.append(s)

j = dask.delayed(ss)
b = time.time(); yy = j.compute(); t = time.time() - b
print(f'time to run in parallel {t}')

Typical results are:

time to run in serial 55.682398080825806
time to run in parallel 135.2043571472168

It seems I'm missing something basic here.

Upvotes: 2

Views: 547

Answers (1)

mdurant
mdurant

Reputation: 28673

You are running a pure CPU-bound computation in threads by default. Because of python's Global Interpreter Lock (GIL), only one thread is actually running at a time. In short, you are only adding overhead to your original compute, due to thread switching and task executing.

To actually get faster for this workload, you should use dask-distributed. Just adding

import dask.distributed
client = dask.distributed.Client(threads_per_worker=1)

at the start of your script may well give you a decent speed up, since this invokes a certain number of processes, each with their own GIL. This scheduler becomes the default one just by creating it.

EDIT: ignore the following, I see you are already doing it :). Leaving here for others, unless people want it gone ...The second problem, for dask, is the sheer number of tasks. For any task execution system, there is an overhead associated with each task (actually, this is higher for distributed than the default threads scheduler). You could get around it by computing batches of function calls per task. This is, in practice, what dask.array and dask.dataframe do: they operate on largeish pieces of the overall problem, such that the overhead becomes small compared to the useful CPU execution time.

Upvotes: 2

Related Questions