Reputation: 33
I have a distributed dask cluster setup and I have used it to load and transform a bunch of data. Works like a charm.
I'm want to use it do some processing in parallel. Here's my function
el = 5000
n_using = 26
n_across= 6
mat = np.random.random((el,n_using,n_across))
idx = np.tril_indices(n_across*2, -n_across)
def get_vals(c1, m, el, idx):
m1 = m[c1,:,:]
corr_vals = np.zeros((el, (n_across//2)*(n_across+1)))
for c2 in range(c1+1, el):
corr = np.corrcoef(m1.T, m[c2,:,:].T)
corr_vals[c2] = corr[idx]
return corr_vals
lazy_get_val = dask.delayed(get_vals, pure=True)
Here is a single processor version of what I'm trying to do:
arrays = [get_vals(c1, mat, el, idx) for c1 in range(el)]
all_corr = np.stack(arrays, axis=0)
Works fine but takes a few hours. Here's my go at doing this in dask:
lazy_list = [lazy_get_val(c1, mat, el, idx) for c1 in range(el)]
arrays = [da.from_delayed(lazy_item, dtype=float, shape=(el, 21)) for lazy_item in lazy_list]
all_corr = da.stack(arrays, axis=0)
Even if it run all_corr[1].compute()
, it just sits there and doesn't respond. When I interrupt the kernel, it seems to be stuck at /distributed/utils.py:
~/.../lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
249 else: 250 while not e.is_set(): --> 251 e.wait(10) 252 if error[0]: 253 six.reraise(*error[0])
Any suggestions on debugging this?
Other things:
mat
(el=1000) and it runs fine.el = 5000
, it hangs.el = 1000
, it hangs.Upvotes: 2
Views: 545
Reputation: 57319
After adding imports to the example I ran things and it was very slow while building the graph. This can be improved by avoiding placing numpy arrays directly in delayed calls as follows:
# mat = np.random.random((el,n_using,n_across))
# idx = np.tril_indices(n_across*2, -n_across)
mat = dask.delayed(np.random.random)((el,n_using,n_across))
idx = dask.delayed(np.tril_indices)(n_across*2, -n_across)
Or by removing the pure=True
keyword to dask.delayed (when you set pure=True it has to hash the contents of all inputs to get a unique key for them, you're doing this 5000 times). I found this out by profiling your code with the %snakeviz
magic in IPython.
I then ran all_corr[1].compute()
and it was fine. I then ran all_corr.compute()
and it seemed like it would progress to completion, but wasn't very fast. I suspect that either your tasks are too small so that there is too much overhead, or that your code is spending too much time in Python for loops and so is running into GIL issues. Not sure which.
The next thing I would recommend trying would be using the dask.distributed scheduler, which would handle the GIL issue better and exacerbate the overhead issue. Seeing how that performed would probably help isolate the issue.
Upvotes: 2