Reputation: 123
dask.compute(...) is expected to be a blocking call. However when I have nested dask.compute, and the inner one does I/O (like dask.dataframe.read_parquet), the inner dask.compute is not blocking. Here's a pseudo code example:
import dask, distributed
def outer_func(name):
files = find_files_for_name(name)
df = inner_func(files).compute()
# do work with df
return result
def inner_func(files):
tasks = [ dask.dataframe.read_parquet(f) for f in files ]
tasks = dask.dataframe.concat(tasks)
return tasks
client = distributed.Client(scheduler_file=...)
results = dask.compute([ dask.delay(outer_func)(name) for name in names ])
If I started 2 workers with 8 processes each, like:
dask-worker --scheduler-file $sched_file --nprocs 8 --nthreads 1
, then I would expect at most 2 x 8 concurrent inner_func running because inner_func(files).compute() should be blocking. However, what I observed was that within one worker process, as soon as it starts the read_parquet step, there could be another inner_func(files).compute() starts running. So in the end there could be multiple inner_func(files).compute() running, and sometime it could cause out-of-memory error.
Is this expected behavior? If so, is any way to enforce one inner_func(files).compute() per worker process?
Upvotes: 0
Views: 789
Reputation: 28673
When you ask the dask distributed scheduler to run work, it ships the code of the functions, and any data required, to worker functions that are in different processes, possibly on different machines. These worker processes faithfully execute the functions, with run as normal python code. The point is, that the running function does not know it is on a dask worker - it will, by default, see that there is no global dask distributed client set up, and do what dask would normally do for this case: execute any dask workloads on the default scheduler (the threaded one).
If you really must execute full dask-compute operations within tasks, and want these to use the distributed scheduler that is running those tasks, you will need to use the worker client. However, I feel that in your case, rephrasing the job to remove the nesting (something like the pseudo-code above, although this could work with compute too) is probably the simpler approach.
Upvotes: 1
Reputation: 123
This does not appear to be the case with multi-process scheduler.
In order to use the distributed scheduler, I found the workaround by using paced job submission via distributed.Client API rather than relying on dask.compute. The dask.compute is ok for simple use cases, but clearly does not have a good idea how many outstanding tasks can be scheduled, therefore overrun the system in this case.
Here's the pseudo code for run of a collection of dask.Delayed tasks with pacing:
import distributed as distr
def paced_compute(tasks, batch_size, client):
"""
Run delayed tasks, maintaining at most batch_size running at any
time. After the first batch is submitted,
submit a new job only after an existing one is finished,
continue until all tasks are computed and finished.
tasks: collection of dask.Delayed
client: distributed.Client obj
"""
results, tasks = [], list(tasks)
working_futs = client.compute(tasks[:batch_size])
tasks = tasks[batch_size:]
ac = distr.as_completed(working_futs)
for fut in ac:
res = fut.result()
results.append(res)
if tasks:
job = tasks.pop()
ac.add(client.compute(job))
return results
Upvotes: 0