Reputation: 283
I'm parallelizing a function over 32 cores and am having some trouble accessing a shared dataframe dask_paths
. All the code works correctly when I get rid of the line (and lines that depend on it) dask_paths[dask_paths['od'] == (o,d)].compute()
. Incredibly, if I compute this for some fixed o, d
outside of the distributed code, then use that result, I get what I want (for that o, d
). This means it really is just the actual accessing of dask_paths
that is failing in the parallel computation. I am using the logic given here for "embarassingly paralellizable for loops" in the dask docs. Moreover, I used to use get_group
on a global pd.DataFrame
grouped
for this logic, and that suffered from the same problem of glogbal (even though this is serially done in a couple seconds, the computation stalls before giving a cryptic error message, given at the bottom). I don't know why this is.
Note that dask_paths
is a Dask.dataframe
. This is the most basic of logic in parallellizing with dask
, so not sure why it's failing. I am working on a Vertex AI jupyter notebook on Google Cloud. There is no error trace, because the program simply stalls. All the data/dataframes have been defined in the global environment of the jupyter notebook in the cells above, and are working fine. The vertex AI notebook has 16 vCPUs and 100GB RAM and is run on Google Cloud's VM. There is no reading or writing to any files, so that's not the issue.
# dask_paths['od'] takes on values like (100, 150)
# popular takes the form of a [[20, 25], [100, 150], [67, 83],...]
# and is of length 2000 elements, every element is a list of len 2
def main():
def pop2unique(pop):
df = dask_paths[dask_paths['od'] == (pop[0], pop[1])].compute()
return df['last'].sum()
lzs = []
ncores = 32
dask_client.cluster.scale(10)
futures = dask_client.map(pop2unique, popular[:10]) # this stalls
results = dask_client.gather(futures)
And dask_paths
takes the form
index (o, d) last
605096 (22, 25) 10
999336 (103, 88) 31
1304512 (101, 33) 9
1432383 (768, 21) 16
The client
being used everywhere is given by
from dask.distributed import Client, progress
dask_client = Client(threads_per_worker=4, n_workers=8)
The error message I get is long and cryptic:
distributed.diskutils - INFO - Found stale lock file and directory '/home/jupyter/dask-worker-space/worker-9y17gy_r', purging
distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 33
distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 35
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
await worker
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
await self._register_with_scheduler()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 33', 'time': 1658860560.8544912}
distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 36
distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 31
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
await worker
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
await self._register_with_scheduler()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 35', 'time': 1658860560.8563268}
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
await worker
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
await self._register_with_scheduler()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 36', 'time': 1658860560.8576422}
distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 34
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
await worker
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
await self._register_with_scheduler()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 31', 'time': 1658860560.8595085}
distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 32
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
await worker
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
await self._register_with_scheduler()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 34', 'time': 1658860560.8609138}
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
await worker
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
await self._register_with_scheduler()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 32', 'time': 1658860560.862359}
distributed.nanny - ERROR - Failed while trying to start worker process: Unexpected response from register: {'status': 'error', 'message': 'name taken, 36', 'time': 1658860560.8576422}
distributed.nanny - ERROR - Failed while trying to start worker process: Unexpected response from register: {'status': 'error', 'message': 'name taken, 31', 'time': 1658860560.8595085}
distributed.nanny - ERROR - Failed while trying to start worker process: Unexpected response from register: {'status': 'error', 'message': 'name taken, 35', 'time': 1658860560.8563268}
distributed.nanny - ERROR - Failed while trying to start worker process: Unexpected response from register: {'status': 'error', 'message': 'name taken, 33', 'time': 1658860560.8544912}
distributed.nanny - ERROR - Failed while trying to start worker process: Unexpected response from register: {'status': 'error', 'message': 'name taken, 34', 'time': 1658860560.8609138}
distributed.nanny - ERROR - Failed while trying to start worker process: Unexpected response from register: {'status': 'error', 'message': 'name taken, 32', 'time': 1658860560.862359}
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /opt/conda/lib/python3.7/asyncio/tasks.py:623> exception=ValueError("Unexpected response from register: {'status': 'error', 'message': 'name taken, 32', 'time': 1658860560.862359}")>
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 334, in start
response = await self.instantiate()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 417, in instantiate
result = await self.process.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 694, in start
msg = await self._wait_until_connected(uid)
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 812, in _wait_until_connected
raise msg["exception"]
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
await worker
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
await self._register_with_scheduler()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 32', 'time': 1658860560.862359}
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /opt/conda/lib/python3.7/asyncio/tasks.py:623> exception=ValueError("Unexpected response from register: {'status': 'error', 'message': 'name taken, 36', 'time': 1658860560.8576422}")>
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 334, in start
response = await self.instantiate()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 417, in instantiate
result = await self.process.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 694, in start
msg = await self._wait_until_connected(uid)
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 812, in _wait_until_connected
raise msg["exception"]
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
await worker
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
await self._register_with_scheduler()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 36', 'time': 1658860560.8576422}
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /opt/conda/lib/python3.7/asyncio/tasks.py:623> exception=ValueError("Unexpected response from register: {'status': 'error', 'message': 'name taken, 35', 'time': 1658860560.8563268}")>
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 334, in start
response = await self.instantiate()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 417, in instantiate
result = await self.process.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 694, in start
msg = await self._wait_until_connected(uid)
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 812, in _wait_until_connected
raise msg["exception"]
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
await worker
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
await self._register_with_scheduler()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 35', 'time': 1658860560.8563268}
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /opt/conda/lib/python3.7/asyncio/tasks.py:623> exception=ValueError("Unexpected response from register: {'status': 'error', 'message': 'name taken, 34', 'time': 1658860560.8609138}")>
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 334, in start
response = await self.instantiate()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 417, in instantiate
result = await self.process.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 694, in start
msg = await self._wait_until_connected(uid)
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 812, in _wait_until_connected
raise msg["exception"]
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
await worker
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
await self._register_with_scheduler()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 34', 'time': 1658860560.8609138}
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /opt/conda/lib/python3.7/asyncio/tasks.py:623> exception=ValueError("Unexpected response from register: {'status': 'error', 'message': 'name taken, 33', 'time': 1658860560.8544912}")>
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 334, in start
response = await self.instantiate()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 417, in instantiate
result = await self.process.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 694, in start
msg = await self._wait_until_connected(uid)
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 812, in _wait_until_connected
raise msg["exception"]
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
await worker
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
await self._register_with_scheduler()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 33', 'time': 1658860560.8544912}
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /opt/conda/lib/python3.7/asyncio/tasks.py:623> exception=ValueError("Unexpected response from register: {'status': 'error', 'message': 'name taken, 31', 'time': 1658860560.8595085}")>
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 334, in start
response = await self.instantiate()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 417, in instantiate
result = await self.process.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 694, in start
msg = await self._wait_until_connected(uid)
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 812, in _wait_until_connected
raise msg["exception"]
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
await worker
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
await self._register_with_scheduler()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 31', 'time': 1658860560.8595085}
distributed.diskutils - INFO - Found stale lock file and directory '/home/jupyter/dask-worker-space/worker-xd5jxrin', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/jupyter/dask-worker-space/worker-w_fmefrs', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/jupyter/dask-worker-space/worker-djg8ki4m', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/jupyter/dask-worker-space/worker-ho1hw10b', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/jupyter/dask-worker-space/worker-mbdw10vg', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/jupyter/dask-worker-space/worker-whk890cp', purging
distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 32
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
await worker
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
await self._register_with_scheduler()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 32', 'time': 1658860564.692771}
distributed.nanny - ERROR - Failed while trying to start worker process: Unexpected response from register: {'status': 'error', 'message': 'name taken, 32', 'time': 1658860564.692771}
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7f82cf7eba10>>, <Task finished coro=<SpecCluster._correct_state_internal() done, defined at /opt/conda/lib/python3.7/site-packages/distributed/deploy/spec.py:310> exception=ValueError("Unexpected response from register: {'status': 'error', 'message': 'name taken, 32', 'time': 1658860564.692771}")>)
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/tornado/ioloop.py", line 741, in _run_callback
ret = callback()
File "/opt/conda/lib/python3.7/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
future.result()
File "/opt/conda/lib/python3.7/site-packages/distributed/deploy/spec.py", line 348, in _correct_state_internal
await w # for tornado gen.coroutine support
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 334, in start
response = await self.instantiate()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 417, in instantiate
result = await self.process.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 694, in start
msg = await self._wait_until_connected(uid)
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 812, in _wait_until_connected
raise msg["exception"]
File "/opt/conda/lib/python3.7/site-packages/distributed/nanny.py", line 884, in run
await worker
File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 279, in _
await self.start()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1584, in start
await self._register_with_scheduler()
File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 1299, in _register_with_scheduler
raise ValueError(f"Unexpected response from register: {response!r}")
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, 32', 'time': 1658860564.692771}
Upvotes: 0
Views: 274
Reputation: 28683
The errors you are seeing might not be related to your workflow - maybe a version conflict or similar.
However, you are mixing dask API paradigms. You have created a dask-dataframe - which understand how to partitionify operations for dask to compute - but then chosen to create tasks manually yourself. This is a bad idea. Dask tasks should generally be operating on one partition of a normal data structure (in this case, dataframe) not on a dask collection (in this case, dask dataframe). It may be (I am not sure) that the attempt to serialise the dask-dataframe and deserialise it on the workers is what is causing them to fail to start properly.
Your workflow at first glance looks like a full shuffle, but indeed i does parallelise OK, because you can groupby in each partition, and sum the results.
def per_partition_op(df):
return df.groupby("od")["last"].sum()
df2 = df.map_partitions(per_partition_op)
At this point, you can just compute and work with the partials series, since this should already be of a manageable size
partials = df2.compute()
results = partials.groupby(level=0).sum()
Upvotes: 1