tnt
tnt

Reputation: 3591

Can dask parallelization be encapsulated in a class?

Is it possible to encapsulate dask parallelization within a class? In its final form my class will do a lot of initialization before calling run - I trimmed my issue down to frame the question. Note that the code works for a LocalCluster and also distributed calcs outside of a class works fine on the same HPC cluster. Here is the boiled down code along with the corresponding error messages:

import numpy as np
from dask_jobqueue import PBSCluster
from dask.distributed import Client
from dask.distributed import wait

class Simulate:
    def __init__(self):
        pass

    def run(self):
        cluster = PBSCluster(cores=12, memory='1GB', queue='low', project='classtest', name='classtest_dask',
                             walltime='02:00:00', local_directory='/scratch/mmatthews')
        cluster.scale(10)  # Ask for # workers
        client = Client(cluster)

        seeds = list(np.arange(100))
        a = client.map(self.run_trial, seeds)
        wait(a)

        trial_results = [a[i].result() for i in range(len(a))]

        cluster.scale(0)
        cluster.close()

    def run_trial(self, trial_seed):
        np.random.seed(trial_seed)
        rst = np.random.randint
        print('Simulation Finished rst=%s' % rst)
        return rst

simob = Simulate()
simob.run()

Error to StdErr:

> distributed.client - ERROR - Failed to reconnect to scheduler after
> 10.00 seconds, closing client distributed.utils - ERROR -  Traceback (most recent call last):   File
> "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/utils.py",
> line 666, in log_errors
>     yield   File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/client.py",
> line 1268, in _close
>     await gen.with_timeout(timedelta(seconds=2), list(coroutines)) concurrent.futures._base.CancelledError distributed.utils - ERROR - 
> Traceback (most recent call last):   File
> "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/utils.py",
> line 666, in log_errors
>     yield   File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/client.py",
> line 998, in _reconnect
>     await self._close()   File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/client.py",
> line 1268, in _close
>     await gen.with_timeout(timedelta(seconds=2), list(coroutines)) concurrent.futures._base.CancelledError

Error in PBS error file:

$ cat classtest_dask.e156272
distributed.nanny - INFO -         Start Nanny at: 'tcp://160.84.192.224:40753'
distributed.diskutils - INFO - Found stale lock file and directory '/scratch/mmatthews/worker-bnjpcqmq', purging
distributed.worker - INFO -       Start worker at: tcp://160.84.192.224:44564
distributed.worker - INFO -          Listening to: tcp://160.84.192.224:44564
distributed.worker - INFO -          dashboard at:       160.84.192.224:35232
distributed.worker - INFO - Waiting to connect to: tcp://160.84.192.193:39664
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                         12
distributed.worker - INFO -                Memory:                 1000.00 MB
distributed.worker - INFO -       Local Directory: /scratch/mmatthews/worker-kbw6dtj_
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to: tcp://160.84.192.193:39664
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at 'tcp://160.84.192.224:40753'
distributed.dask_worker - INFO - End worker
Traceback (most recent call last):
  File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/cli/dask_worker.py", line 410, in <module>
    go()
  File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/cli/dask_worker.py", line 406, in go
    main()
  File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/click/core.py", line 764, in __call__
    return self.main(*args, **kwargs)
  File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/click/core.py", line 717, in main
    rv = self.invoke(ctx)
  File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/click/core.py", line 956, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/click/core.py", line 555, in invoke
    return callback(*args, **kwargs)
  File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/cli/dask_worker.py", line 397, in main
    raise TimeoutError("Timed out starting worker.") from None
tornado.util.TimeoutError: Timed out starting worker.

Upvotes: 1

Views: 225

Answers (1)

MRocklin
MRocklin

Reputation: 57281

Can dask parallelization be encapsulated in a class?

Yes. Dask calls are just normal Python calls. There is nothing that stops them from interacting with the rest of the language.

Your actual error seems to be entirely unrelated. It looks like something killed your worker

distributed.dask_worker - INFO - Exiting on signal 15

Unfortunately there is no information about what that was. I recommend checking with your system administrator.

Upvotes: 1

Related Questions