A32167
A32167

Reputation: 26

How to composite tasks in dask-distributed

I am trying to run a joblib parallel loop inside of a threaded dask-distributed cluster (see below the reason), but I can't get any speedup due to GIL-lock. Here's an example:

def task(x):
    """ Sample single-process task that takes between 2 and 5 seconds """
    import time
    import random
    dt = random.uniform(2,5)
    time.sleep(dt)
    return x+dt

def composite_task(np=8):
    """ Composite task that runs multiple single-process runs in parallel """
    from functools import partial
    from joblib import Parallel, delayed, parallel_backend
    with parallel_backend('loky', n_jobs=np):
        out=Parallel()(delayed(task)(i) for i in list(range(0, np)))
    return out

Single-cpu task takes 3.5 seconds on average

%timeit -n7 -r1 task(0)
3.61 s ± 0 ns per loop (mean ± std. dev. of 1 run, 7 loops each)

Joblib works as expected, 8 tasks take not longer than a single longest one

%timeit -n1 -r1 composite_task(8)
5.03 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

However when I try to run this code inside of a dask LocalCluster with 8 threads I don't get any speedup

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=1, threads_per_worker=8)
client = Client(cluster)

%timeit -n1 -r1 client.submit(composite_task,8).result()
25.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

Probably I'm misunderstanding how GIL works. Please help. A full notebook can be viewed here:

http://nbviewer.jupyter.org/gist/aeantipov/6d670e13cd503741e9ef5b0299719a8e


The reason to try this is the necessity to solve >10k tasks with locked GIL on about 50 nodes with 32 cpus. It is easy to make a dask-jobqueue cluster with 50 workers*32 threads but not 1600 workers. And unfortunately, as GIL is locked, using this example http://matthewrocklin.com/blog/work/2018/06/26/dask-scaling-limits doesn't give any significant speedup over running 50 workers.


dask                      0.19.1                
dask-core                 0.19.1                
dask-jobqueue             0.3.0             
python                    3.7.0
distributed               1.23.1

Upvotes: 0

Views: 308

Answers (1)

MRocklin
MRocklin

Reputation: 57261

I would just use the dask-joblib solution

cluster = LocalCluster()
client = Client(cluster)

with joblib.parallel_backend('dask'):
    out=Parallel()(delayed(task)(i) for i in range(0, np))

Your concerns about the GIL don't apply here. Your function calls sleep, which releases the GIL during execution. If your actual function is pure Python code and doesn't release the GIL then I recommend launching a Dask cluster with many single-threaded processes. If you're using dask-jobqueue then you want to use the processes= keyword to control the processes per job.

You can have many more tasks than you have processes.

Upvotes: 1

Related Questions