Reputation: 400
I'm queuing a task with the command sbatch
and the next configuration:
#SBATCH --job-name=dask-test
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=10
#SBATCH --mem=80G
#SBATCH --time=00:30:00
#SBATCH --tmp=10G
#SBATCH --partition=normal
#SBATCH --qos=normal
python ./dask-test.py
The python script is, more or less, as follows:
import pandas as pd
import dask.dataframe as dd
import numpy as np
from dask.distributed import Client, LocalCluster
print("Generating LocalCluster...")
cluster = LocalCluster()
print("Generating Client...")
client = Client(cluster, processes=False)
print("Scaling client...")
client.scale(8)
data = dd.read_csv(
BASE_DATA_SOURCE + '/Data-BIGDATFILES-*.csv',
delimiter=';',
)
def get_min_dt():
min_dt = data.datetime.min().compute()
print("Min is {}".format())
print("Getting min dt...")
get_min_dt()
The first problem is that the texts "Generating LocalCluster..." is printed 6 times, which makes me wonder if the script is running multiple times concurrently. Secondly, after some minutes of printing nothing, I receive the following messages:
/anaconda3/lib/python3.7/site-packages/distributed/node.py:155: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 37396 instead
http_address["port"], self.http_server.port
many times.. and finally the next one, also many times:
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /cluster/home/user/anaconda3/lib/python3.7/asyncio/tasks.py:592> exception=RuntimeError('\n An attempt has been made to start a new process before the\n current process has finished its bootstrapping phase.\n\n This probably means that you are not using fork to start your\n child processes and you have forgotten to use the proper idiom\n in the main module:\n\n if __name__ == \'__main__\':\n freeze_support()\n ...\n\n The "freeze_support()" line can be omitted if the program\n is not going to be frozen to produce an executable.')>
Traceback (most recent call last):
File "/cluster/home/user/anaconda3/lib/python3.7/asyncio/tasks.py", line 599, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/core.py", line 290, in _
await self.start()
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 295, in start
response = await self.instantiate()
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 378, in instantiate
result = await self.process.start()
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 575, in start
await self.process.start()
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 34, in _call_and_set_future
res = func(*args, **kwargs)
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 202, in _start
process.start()
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/process.py", line 112, in start
self._popen = self._Popen(self)
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
_check_not_importing_main()
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
I already tried adding more cores, more memory, setting processes=False
at the instantiation of Client
, and many other things but I can't figure it out what's the problem.
The used library/software versions are:
Am I setting something wrong? Is the way of using local cluster and client structures correct?
Upvotes: 1
Views: 522
Reputation: 400
After some research, I could get a solution. Not too sure about the cause but very sure it works.
The instantiation of LocalCluster, Client and all the code after it (the code that will be distributed executed) must NOT be at module level of the Python script. Instead, this code must be in a method or inside the __main__ block, as follows:
import pandas as pd
import dask.dataframe as dd
import numpy as np
from dask.distributed import Client, LocalCluster
if __name__ == "__main__":
print("Generating LocalCluster...")
cluster = LocalCluster()
print("Generating Client...")
client = Client(cluster, processes=False)
print("Scaling client...")
client.scale(8)
data = dd.read_csv(
BASE_DATA_SOURCE + '/Data-BIGDATFILES-*.csv',
delimiter=';',
)
def get_min_dt():
min_dt = data.datetime.min().compute()
print("Min is {}".format())
print("Getting min dt...")
get_min_dt()
This simple change makes a difference. The solution was found in that issues thread: https://github.com/dask/distributed/issues/2520#issuecomment-470817810
Upvotes: 1