Sid R
Sid R

Reputation: 33

Dask delayed / dask array no response

I have a distributed dask cluster setup and I have used it to load and transform a bunch of data. Works like a charm.

I'm want to use it do some processing in parallel. Here's my function

el = 5000
n_using = 26
n_across= 6

mat = np.random.random((el,n_using,n_across))
idx = np.tril_indices(n_across*2, -n_across)

def get_vals(c1, m, el, idx):
    m1 = m[c1,:,:]
    corr_vals = np.zeros((el, (n_across//2)*(n_across+1)))
    for c2 in range(c1+1, el):
        corr = np.corrcoef(m1.T, m[c2,:,:].T)
        corr_vals[c2] = corr[idx]
        
    return corr_vals

lazy_get_val = dask.delayed(get_vals, pure=True)

Here is a single processor version of what I'm trying to do:

arrays = [get_vals(c1, mat, el, idx) for c1 in range(el)]
all_corr = np.stack(arrays, axis=0)

Works fine but takes a few hours. Here's my go at doing this in dask:

lazy_list = [lazy_get_val(c1, mat, el, idx) for c1 in range(el)]
arrays = [da.from_delayed(lazy_item, dtype=float, shape=(el, 21)) for lazy_item in lazy_list]
all_corr = da.stack(arrays, axis=0)

Even if it run all_corr[1].compute(), it just sits there and doesn't respond. When I interrupt the kernel, it seems to be stuck at /distributed/utils.py:

~/.../lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)

    249     else:
    250         while not e.is_set():
--> 251             e.wait(10)
    252     if error[0]:
    253         six.reraise(*error[0])

Any suggestions on debugging this?


Other things:

Upvotes: 2

Views: 545

Answers (1)

MRocklin
MRocklin

Reputation: 57319

After adding imports to the example I ran things and it was very slow while building the graph. This can be improved by avoiding placing numpy arrays directly in delayed calls as follows:

# mat = np.random.random((el,n_using,n_across))
# idx = np.tril_indices(n_across*2, -n_across)
mat = dask.delayed(np.random.random)((el,n_using,n_across))
idx = dask.delayed(np.tril_indices)(n_across*2, -n_across)

Or by removing the pure=True keyword to dask.delayed (when you set pure=True it has to hash the contents of all inputs to get a unique key for them, you're doing this 5000 times). I found this out by profiling your code with the %snakeviz magic in IPython.

I then ran all_corr[1].compute() and it was fine. I then ran all_corr.compute() and it seemed like it would progress to completion, but wasn't very fast. I suspect that either your tasks are too small so that there is too much overhead, or that your code is spending too much time in Python for loops and so is running into GIL issues. Not sure which.

The next thing I would recommend trying would be using the dask.distributed scheduler, which would handle the GIL issue better and exacerbate the overhead issue. Seeing how that performed would probably help isolate the issue.

Upvotes: 2

Related Questions