Luca
Luca

Reputation: 21

How do I correctly set worker ports in a Dask distributed SSHCluster?

I'm trying to use Dask to distribute work from a machine (call it A) across 4 servers in a data center (call them B, C, D and E). A should set up the SSHCluster assigning the scheduler to live on B which then should spawn workers on B, C, D, and E. The trick is that only some ports are open and therefore must be specified. That's easy to do for the scheduler but I can't get it to work for the workers.

If they're not specified, A successfully starts the scheduler on B. The scheduler then believes it successfully starts all the workers on random ports but when gathering results, find that it can only contacts the workers on B. This makes sense so far. Code for this:

cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
                                 worker_options={
                                     'nprocs': procs_per_node, 
                                     'nthreads': 1,
                                 })

As soon as I try to set the ports for the workers, it fails to start workers. This appears to happen no matter the input I give. I've tried to start one worker on each server, specifying the port to use as an int:

cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
                                 worker_options={
                                     'nprocs': procs_per_node, 
                                     'nthreads': 1,
                                     'port': 60000,
                                 })

I've tried to start multiple workers on each server giving a range of ports to use:

cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
                                 worker_options={
                                     'nprocs': procs_per_node, 
                                     'nthreads': 1,
                                     'port': '{}:{}'.format(
                                         60000, 60000 + procs_per_node - 1),
                                 })

I've tried to start multiple workers on each server giving the full range of ports available:

cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
                                 worker_options={
                                     'nprocs': procs_per_node, 
                                     'nthreads': 1,
                                     'port': '60000:61000'
                                 })

Each time it returns four errors (from B, C, D, and E) saying "Exception: Worker failed to start"

In summary, these are my questions:

For reference, here are the versions I'm using (they might not all be relevant): python 3.8.3, dask 2.18.1, dask-core 2.18.1, distributed 2.18.0, tornado 6.0.4, bokeh 2.01

Upvotes: 2

Views: 1297

Answers (1)

abduh
abduh

Reputation: 43

This seems to work fine when using worker_port instead of port

cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
                                 worker_options={
                                     'nprocs': procs_per_node, 
                                     'nthreads': 1,
                                     'worker_port': '60000:61000'
                                 })

https://github.com/dask/distributed/blob/93701f82c2cef46d4e68696bf48af0fc65ea9159/distributed/cli/dask_worker.py#L54

Upvotes: 2

Related Questions