Reputation: 305
We’re using dask to optimize deep-learner (DL) architectures by generating designs and then sending them to dask workers that, in turn, use pytorch for training. We observe some of the workers do not appear to start, and those that do complete evaluating a DL do not immediately begin evaluating the next awaiting DL.
We’ve implemented this on Oak Ridge National Laboratory’s Summit supercomputer. For our prototype, we submit a batch job that allocates 92 nodes, spins up a dask scheduler, and 92 dask workers, with one worker dedicated to each node. Each node had 6 Nvidia Volta V100s, two IBM Power9s, and 512 GB DDR4 + 96GB HMB@ memory. Each worker then uses pytorch to train a DL and return its validation accuracy as a “fitness.” However, if the posed DL architecture is not viable, an exception is thrown, and the associated fitness becomes -MAXINT.
In initial trial runs with just two workers, we noted that if a worker evaluated a malformed DL design that it would immediately be assigned a new DL to evaluate. At no time were any of the two workers idle until the end of the run.
This is a condensed version of the actual code.
from dask.distributed import Client, as_completed
client = Client(scheduler_file=’scheduler.json’)
# posed_dl_designs is a list of random DL architectures,
# eval_dl is the entry point for the pytorch training
worker_futures = client.map(eval_dl, posed_dl_designs)
for res in as_completed(worker_futures):
evaluated_dl = res.result()
# pool is Queue of evaluated DLs sorted by validation
# accuracy; so update_pool() replaces the least accurate DL
# with the newly evaluated DL
update_pool(evaluated_dl, pool)
# Let the workers drain down if we meet some kind of budget
# for generated DL designs; otherwise generate a new DL and
# give it to a worker for training/evaluation
if not stop():
# create_new_dl() selects one of the better DLs from
# the pool, clones it, and alters it slightly, thereby
# creating a new DL design
new_dl = create_new_dl(pool)
# Now evaluate/train the new DL
new_future = client.submit(eval_dl, new_dl)
iterator.add(new_future)
And this is how we invoked the scheduler and workers:
# The scheduler doesn't need GPUs. It just needs one lonely core to run on.
jsrun --gpu_per_rs 0 --nrs 1 --tasks_per_rs 1 --cpu_per_rs 1 --rs_per_host 1 dask-scheduler --interface ib0 --no-bokeh --no-show --scheduler-file $SCHEDULER_FILE &
# Spin up an individual task for each worker. Since dask does not use MPI, specify smpiargs none.
for i in {0..91}; do
jsrun --smpiargs="none" --nrs 1 -e individual --stdio_stdout ${RUN_DIR}/worker_out.%h.%j.%t.%p --stdio_stderr ${RUN_DIR}/worker_error.%h.%j.%t.%p --tasks_per_rs 1 --cpu_per_rs 14 --gpu_per_rs 6 --rs_per_host 1 dask-worker --nthreads 1 --nprocs 1 --memory-limit 512e9 --interface ib0 --no-bokeh --reconnect --scheduler-file $SCHEDULER_FILE --resources "MEM=512e9" &
done
# Invocation for the controller process elided
When we scaled up the runs to employ 92 workers, we found that after a few minutes only five or six were running -- these corresponded to workers that had viable DLs to train as their first DL design candidates.
The idle workers fell into two categories. The majority of idle workers apparently had evaluated a broken DL design, and had faithfully returned the appropriate special value indicating that; but then a new DL was never reassigned to that now free worker. The other class of workers never evaluated any DLs, and the following typifies their output:
distributed.nanny - INFO - Start Nanny at: 'tcp://10.41.18.55:45941'
distributed.diskutils - INFO - Found stale lock file and directory '/gpfs/alpine/csc342/proj-shared/may/delemera_first_trial_run/worker-c4o0rsb3', purging
distributed.worker - INFO - Start worker at: tcp://10.41.18.55:44107
distributed.worker - INFO - Listening to: tcp://10.41.18.55:44107
distributed.worker - INFO - nanny at: 10.41.18.55:45941
distributed.worker - INFO - Waiting to connect to: tcp://10.41.18.54:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 512.00 GB
distributed.worker - INFO - Local Directory: /gpfs/alpine/csc342/proj-shared/may/delemera_first_trial_run/worker-kqf62513
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://10.41.18.54:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
So, for those class of idle workers there was a problem in establishing communication with the scheduler. We did not note any other possibly related messages.
Upvotes: 5
Views: 634
Reputation: 305
It turns out that the problem was not with dask, but with how we invoked the code.
That is, there is a run-time parameter in our top-level script that dictates the size of the initial population sent to the workers. Our original implementation would use a default value if one was not specified, and in our job submission script we elided the corresponding command line argument, which meant that the default value of five was used for the size of the initial population.
The way we'd configured our setup, as noted in the above code snippet, was that regardless of the number of allocated workers, or the size of the pool of updated individuals, the initial population size would dictate moving forward the number of actual busy workers. That is, since we used the default value of five, those were the first tasks given the scheduler. Then, as each worker evaluated a DL, it would be added to the pool, and another worker would get assigned a new DL to evaluate. (And not necessarily the same worker that just finished evaluating the latest DL, which was an initial source of confusion.) It was never possible for more work to be assigned to the remaining balance of idle workers.
We've now removed the default value for this parameter to force users to specify this critical information for each run.
Some lessons include:
Upvotes: 2