lo tolmencre
lo tolmencre

Reputation: 3934

dask.delayed results in no speedup

I am trying to get into Dask. For that I attempted to parallelize some time consuming sequential code I got. The original code is this:

def sequential():
    sims = [] 
    chunksize = len(tokens)//4
    for i in range(0, len(tokens), chunksize):
        print(i, i+chunksize)
        chunk = tokens[i:i+chunksize]
        sims.append(process(chunk))     
    return sims

%time sequential()

and the prallelized code is this:

def parallel():
    sims = []
    chunksize = len(tokens)//4
    for i in range(0, len(tokens), chunksize):
        print(i, i+chunksize)
        chunk = dask.delayed(tokens[i:i+chunksize])
        sims.append(dask.delayed(process)(chunk))
    return dask.delayed(sims)

%time parallel().visualize()

But the parallelized code always runs around 10% slower than the parallel one. when I visualize the computation graph for sims I get this:

enter image description here

Not sure where list-#8 comes from, but other than that it looks correct. So why is there no speedup? When I look into htop I can see 3 cores active (~30% load each), while for the sequential code I see only 1 core active (100% load). The sequential code runs 7 minutes and the parallel code runs 7 - 8 minutes.

I guess I am misunderstanding how delayed and compute should be used here?


The setup is this, if you require it:

import numpy
import spacy
import dask

nlp = spacy.load('en_core_web_lg')

tokens = [t for t in nlp(" ".join(t.strip() for t in open('./words.txt','r').readlines())) if len(t.text) > 1 and len(t.text) < 20]


def process(chunk):
    sims = numpy.zeros([len(chunk),len(tokens)], dtype=numpy.float32)
    for i in range(len(chunk)):
        for j in range(len(tokens)):
            sims[i,j] = chunk[i].similarity(tokens[j])
    return sims 

Upvotes: 1

Views: 1179

Answers (1)

mdurant
mdurant

Reputation: 28683

You are seeing this behaviour because the default execution engine for dask is based on multiple threads in a single process (the "threaded" scheduler). Python has a lock, the GIL, which ensures the safety of the interpreter by only executing one python statement at a time. Therefore, each thread is spending most of its time waiting for the lock to become available. To avoid this problem, you have two options:

  • find a version of your computation that releases the GIL. This is possible if you can phrase it as (mainly) some numpy, pandas, numba, etc., computation, code that executes at the C level and doesn't need the interpreter, unlike your nested loops.
  • run your code using processes, using either the "mutiprocessing" scheduler or (better) the "distributed" scheduler which, despite the name, also runs well on a single machine.

Further information: http://dask.pydata.org/en/latest/scheduler-overview.html

Upvotes: 2

Related Questions