Reputation: 3591
Is it possible to encapsulate dask parallelization within a class? In its final form my class will do a lot of initialization before calling run - I trimmed my issue down to frame the question. Note that the code works for a LocalCluster and also distributed calcs outside of a class works fine on the same HPC cluster. Here is the boiled down code along with the corresponding error messages:
import numpy as np
from dask_jobqueue import PBSCluster
from dask.distributed import Client
from dask.distributed import wait
class Simulate:
def __init__(self):
pass
def run(self):
cluster = PBSCluster(cores=12, memory='1GB', queue='low', project='classtest', name='classtest_dask',
walltime='02:00:00', local_directory='/scratch/mmatthews')
cluster.scale(10) # Ask for # workers
client = Client(cluster)
seeds = list(np.arange(100))
a = client.map(self.run_trial, seeds)
wait(a)
trial_results = [a[i].result() for i in range(len(a))]
cluster.scale(0)
cluster.close()
def run_trial(self, trial_seed):
np.random.seed(trial_seed)
rst = np.random.randint
print('Simulation Finished rst=%s' % rst)
return rst
simob = Simulate()
simob.run()
Error to StdErr:
> distributed.client - ERROR - Failed to reconnect to scheduler after
> 10.00 seconds, closing client distributed.utils - ERROR - Traceback (most recent call last): File
> "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/utils.py",
> line 666, in log_errors
> yield File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/client.py",
> line 1268, in _close
> await gen.with_timeout(timedelta(seconds=2), list(coroutines)) concurrent.futures._base.CancelledError distributed.utils - ERROR -
> Traceback (most recent call last): File
> "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/utils.py",
> line 666, in log_errors
> yield File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/client.py",
> line 998, in _reconnect
> await self._close() File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/client.py",
> line 1268, in _close
> await gen.with_timeout(timedelta(seconds=2), list(coroutines)) concurrent.futures._base.CancelledError
Error in PBS error file:
$ cat classtest_dask.e156272
distributed.nanny - INFO - Start Nanny at: 'tcp://160.84.192.224:40753'
distributed.diskutils - INFO - Found stale lock file and directory '/scratch/mmatthews/worker-bnjpcqmq', purging
distributed.worker - INFO - Start worker at: tcp://160.84.192.224:44564
distributed.worker - INFO - Listening to: tcp://160.84.192.224:44564
distributed.worker - INFO - dashboard at: 160.84.192.224:35232
distributed.worker - INFO - Waiting to connect to: tcp://160.84.192.193:39664
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 12
distributed.worker - INFO - Memory: 1000.00 MB
distributed.worker - INFO - Local Directory: /scratch/mmatthews/worker-kbw6dtj_
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://160.84.192.193:39664
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at 'tcp://160.84.192.224:40753'
distributed.dask_worker - INFO - End worker
Traceback (most recent call last):
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/cli/dask_worker.py", line 410, in <module>
go()
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/cli/dask_worker.py", line 406, in go
main()
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/click/core.py", line 764, in __call__
return self.main(*args, **kwargs)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/click/core.py", line 717, in main
rv = self.invoke(ctx)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/click/core.py", line 956, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/click/core.py", line 555, in invoke
return callback(*args, **kwargs)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/cli/dask_worker.py", line 397, in main
raise TimeoutError("Timed out starting worker.") from None
tornado.util.TimeoutError: Timed out starting worker.
Upvotes: 1
Views: 225
Reputation: 57281
Can dask parallelization be encapsulated in a class?
Yes. Dask calls are just normal Python calls. There is nothing that stops them from interacting with the rest of the language.
Your actual error seems to be entirely unrelated. It looks like something killed your worker
distributed.dask_worker - INFO - Exiting on signal 15
Unfortunately there is no information about what that was. I recommend checking with your system administrator.
Upvotes: 1