ws_e_c421
ws_e_c421

Reputation: 1123

How to submit a large set of long running parallel tasks to dask?

I have a computational workload that I originally ran with concurrent.futures.ProcessPoolExecutor which I converted to use dask so that I could make use of dask's integrations with distributed computing systems for scaling beyond one machine. The workload consists of two task types:

There can multiple B tasks for each A task.

Originally, my code looked like this:

a_results = client.map(calc_a, a_inputs)
all_b_inputs = [(a_result, b_input) for b_input in b_inputs for a_result in a_results]
b_results = client.map(calc_b, all_b_inputs)
dask.distributed.wait(b_results)

because that was the clean translation from the concurrent.futures code (I actually kept the code so that it could be run either with dask or concurrent.futures so I could compare). client here is a distributed.Client instance.

I have been experiencing some stability issues with this code, especially for large numbers of tasks, and I think I might not be using dask in the best way. Recently, I changed my code to use Delayed instead like this:

a_results = [dask.delayed(calc_a)(a) for a in a_inputs]
b_results = [dask.delayed(calc_b)(a, b) for a in a_inputs for b in b_inputs]
client.compute(b_results)

I did this because I thought perhaps the scheduler could work through the tasks more efficiently if it examined the entire graph before starting anything rather than beginning to schedule the A tasks before knowing about the B tasks. This change seems to help some but I still see some stability issues.

I can create separate questions for the stability problems, but I first wanted to find out if I am using dask in the best way for this use case or if I should modify how I am submitting the tasks. Just to describe the problems briefly, the worst problem to me is that over time my workers drop to 0% CPU and tasks stop completing. Other problems include things like getting KilledWorker exceptions and seeing log messages about an unresponsive loop and time outs. Usually the scheduler runs fine for at least a few hours, completing thousands of tasks before these issues show up (which makes debugging difficult since the feedback loop is so long).

Some questions I have been wondering about:

  1. I can have thousands of tasks to run. Can I submit these all to dask to start out or do I need to submit them in batches? My thought was that the dask scheduler would be better at scheduling tasks than my batching code.
  2. If I do need to batch things myself, can I query the scheduler to find out the maximum number of workers so I can write something that will submit batches of the right size? Or do I need to make the batch size an input to my batching code?
  3. In the end, my results all get written to disk and nothing gets returned. With the way I am running tasks, are resources getting held onto longer than necessary?
  4. My B tasks are long but they could be split by scheduling tasks that solve for solutions at intermediate time steps and feeding those in as the inputs to subsequent solving tasks. I think I need to do this any way because I would like to use an HPC cluster with a timed queue and I think I need to use the lifetime parameter to retire workers to keep them from running over the time limit and that works best with short-lived tasks (to avoid losing work when shut down early). Is there an optimal way to split the B task?

Upvotes: 1

Views: 1189

Answers (1)

SultanOrazbayev
SultanOrazbayev

Reputation: 16581

There are lots of questions here, but with regards to the code snippets you provided, both look correct, but the futures version will scale better in my experience. The reason for that is that by default, whenever one of the delayed tasks fails, the computation of all delayed tasks halts, while futures can proceed as long as they are not directly affected by the failure.

Another observation is that delayed values will tend to hold on to resources after completion, while for futures you can at least .release() them once they have been completed (or use fire_and_forget).

Finally, with very large task lists, it might be worth to make them a bit more resilient to restarts. One basic option is to create simple text files after successful completion of a task, and then on restart check which tasks need to be re-computed. Fancier options include prefect and joblib.memory, but if you don't need all the bells and whistles, the text file route is often fastest.

Upvotes: 1

Related Questions