Reputation: 33
As my data sets get larger (and therefore the number and size of partitions larger), the workers within my distributed Dask cluster eventually fail with connection timeouts between each other.
For example, I repeatedly see error logs like (paths and IPs obfuscated):
distributed.worker - ERROR - Worker stream died during communication: tcp://123.123.123.123:41076
Traceback (most recent call last):
File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/comm/core.py", line 321, in connect
await asyncio.wait_for(comm.write(local_info), time_left())
File "/share/apps/python/miniconda3.8/lib/python3.8/asyncio/tasks.py", line 464, in wait_for
raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 2334, in gather_dep
response = await get_data_from_worker(
File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 3753, in get_data_from_worker
return await retry_operation(_get_data, operation="get_data_from_worker")
File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/utils_comm.py", line 385, in retry_operation
return await retry(
File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/utils_comm.py", line 370, in retry
return await coro()
File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 3730, in _get_data
comm = await rpc.connect(worker)
File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/core.py", line 1012, in connect
comm = await connect(
File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/comm/core.py", line 325, in connect
raise IOError(
OSError: Timed out during handshake while connecting to tcp://123.123.123.123:41076 after 10 s
Based on logged IPs and ports, I know that these are errors with connections between workers, not workers to the scheduler.
The stack traces do not include any references to my code (rather to Dask in my venv), but I do suspect something I am doing in Dask is leading to the issue. Earlier tasks, both custom delayed and DataFrame functions have completed successfully.
Based on watching logs and dashboard while the system fails, I suspect I am having problems in a groupby aggregation. I've already worked through out of memory issues caused by data shuffling by partitioning the data such that the grouped data is contained within each partition (no shuffling should be required). I have confirmed by not seeing any shuffle tasks running in the dashboard. Though the stack trace logged still shows a Dask worker attempting to retrieve data from another using the gather_dep
function.
I am trying to understand better when workers communicate with each other to gauge what I may be doing to cause the issue?
Incrementally increasing the timeout configurations only makes the timeouts take longer. The workers still eventually die while connecting to communicate between each other.
Any other suggestions on how to debug the issue?
Upvotes: 2
Views: 1219
Reputation: 33
This page helps describe the Dask groupby aggregation process: https://saturncloud.io/docs/reference/dask_groupby_aggregations/
In particular, if your partitions are already sorted based on the groupby columns, you can use map_partitions()
to process each partition individually. Do take care in that the partition will need to be small enough to be stored in memory. I'm not clear if that is if the whole DataFrame needs to be in memory, or if Dask is careful enough to only require each partition in memory individually. https://saturncloud.io/docs/reference/dask_groupby_aggregations/#use-map_partitions-instead
I've been able to use map_partitions to get around the worker-to-worker communications & timeout issues I had earlier. This doesn't explain why Dask is trying to communicate with other workers when the DataFrame is already partitioned on one of the groupby columns, but it will at least get you moving forward if you find yourself in a similar situation.
Upvotes: 0