Mike
Mike

Reputation: 913

Strategy to distribute large number of jobs with dask on HPC cluster

I have a rather complex python algorithm I need to distribute across a HPC cluster.

The code is run from a Jupyterhub instance with 60 gb memory. The configuration of the PBS cluster is 1 process, 1 core, 30Gb per worker, nanny=False (the computations won't run otherwise) for a total of 26 workers (the total memory is about 726GB)

I do not need to fetch back any data, since what is needed is written to disk right at the end of the computations. Note that each computations takes about 7 minutes when run independantly.

The problem I run into is the following : each independant worker (Jobname : dask-worker) seems to run fine, it has about 20Gb available of which max 5Gb is used. But whenever I try to launch more than about 50 jobs, then the central worker (Jobname : jupyterhub) runs out of memory after about 20 minutes.

Here is how I distribute the computations :

def complex_python_func(params):
    return compute(params=params).run()

Then I have tried to use client.map or delayed as such :

list_of_params = [1, 2, 3, 4, 5, ... n] # with n > 256

# With delayed
lazy = [dask.delayed(complex_python_func)(l) for l in list_of_params]
futures = client.compute(lazy)
# Or with map
chain = client.map(complex_python_func, list_of_params)

Here is the configuration of the cluster :

cluster = PBSCluster(
    cores=1,
    memory="30GB",
    interface="ib0",
    queue=queue,
    processes=1,
    nanny=False,
    walltime="12:00:00",
    shebang="#!/bin/bash",
    env_extra=env_extra,
    python=python_bin,
)
cluster.scale(32)

I can't understand why it does not work. I would expect Dask to run each computation then release memory (every about 6/7 minutes for each individual task). I check the memory usage of the worker with qstat -f jobId and it keeps increasing until the worker is killed.

What is causing the jupyterhub worker to fail and what would be the good (or at least a better) way of achieving this ?

Upvotes: 2

Views: 338

Answers (1)

SultanOrazbayev
SultanOrazbayev

Reputation: 16551

Two potential leads are:

  1. If the workers are not expected to return anything, then it might be worth changing the return statement to return None (it's not clear what compute() does in your script):
 def complex_python_func(params):
    return compute(params=params).run()
  1. It's possible that dask allocates more than one job per worker and at some point the workers has more tasks than it can handle. One way out of this is to reduce the number of tasks that a worker can take at any given time with resources, e.g. using:
# add resources when creating the cluster
cluster = PBSCluster(
    # all other settings are unchanged, but add this line to give each worker
    extra=['--resources foo=1'],
)

# rest of code skipped, but make sure to specify resources needed by task
# when submitting it for computation
lazy = [dask.delayed(complex_python_func)(l) for l in list_of_params]
futures = client.compute(lazy, resources={'foo': 1})
# Or with map
chain = client.map(complex_python_func, list_of_params, resources={'foo': 1})

For more information on resources, see documentation or this related question Specifying Task Resources: Fractional gpu

Upvotes: 1

Related Questions