Reputation: 1123
I have a computational workload that I originally ran with concurrent.futures.ProcessPoolExecutor
which I converted to use dask so that I could make use of dask's integrations with distributed computing systems for scaling beyond one machine. The workload consists of two task types:
There can multiple B tasks for each A task.
Originally, my code looked like this:
a_results = client.map(calc_a, a_inputs)
all_b_inputs = [(a_result, b_input) for b_input in b_inputs for a_result in a_results]
b_results = client.map(calc_b, all_b_inputs)
dask.distributed.wait(b_results)
because that was the clean translation from the concurrent.futures
code (I actually kept the code so that it could be run either with dask or concurrent.futures so I could compare). client
here is a distributed.Client
instance.
I have been experiencing some stability issues with this code, especially for large numbers of tasks, and I think I might not be using dask in the best way. Recently, I changed my code to use Delayed instead like this:
a_results = [dask.delayed(calc_a)(a) for a in a_inputs]
b_results = [dask.delayed(calc_b)(a, b) for a in a_inputs for b in b_inputs]
client.compute(b_results)
I did this because I thought perhaps the scheduler could work through the tasks more efficiently if it examined the entire graph before starting anything rather than beginning to schedule the A tasks before knowing about the B tasks. This change seems to help some but I still see some stability issues.
I can create separate questions for the stability problems, but I first wanted to find out if I am using dask in the best way for this use case or if I should modify how I am submitting the tasks. Just to describe the problems briefly, the worst problem to me is that over time my workers drop to 0% CPU and tasks stop completing. Other problems include things like getting KilledWorker exceptions and seeing log messages about an unresponsive loop and time outs. Usually the scheduler runs fine for at least a few hours, completing thousands of tasks before these issues show up (which makes debugging difficult since the feedback loop is so long).
Some questions I have been wondering about:
lifetime
parameter to retire workers to keep them from running over the time limit and that works best with short-lived tasks (to avoid losing work when shut down early). Is there an optimal way to split the B task?Upvotes: 1
Views: 1189
Reputation: 16581
There are lots of questions here, but with regards to the code snippets you provided, both look correct, but the futures version will scale better in my experience. The reason for that is that by default, whenever one of the delayed tasks fails, the computation of all delayed tasks halts, while futures can proceed as long as they are not directly affected by the failure.
Another observation is that delayed values will tend to hold on to resources after completion, while for futures you can at least .release()
them once they have been completed (or use fire_and_forget
).
Finally, with very large task lists, it might be worth to make them a bit more resilient to restarts. One basic option is to create simple text files after successful completion of a task, and then on restart check which tasks need to be re-computed. Fancier options include prefect
and joblib.memory
, but if you don't need all the bells and whistles, the text file route is often fastest.
Upvotes: 1