NirIzr
NirIzr

Reputation: 3410

Dask distributed Failed to deserialize with numpy.arrays and sparse.matrices

I'm getting the following error multiple times on different tasks along the graph (changes between executions). Possibly when certain tasks return numpy.arrays and scipy.sparse matrices.

distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04'
Traceback (most recent call last):
  File "/home/user/venv/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
EOFError: Ran out of input
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/user/venv/lib/python3.5/site-packages/distributed/protocol/core.py", line 119, in loads
    value = _deserialize(head, fs)
  File "/home/user/venv/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 158, in deserialize
    return f(header, frames)
  File "/home/user/venv/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 20, in <lambda>
    deserializers = {None: lambda header, frames: pickle.loads(b''.join(frames))}
  File "/home/user/venv/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
EOFError: Ran out of input
distributed.comm.utils - ERROR - truncated data stream (485 bytes): [b'', b"\x92\x83\xa6report\xc2\xa4keys\x91\xd9P('_avro_body-read-block-bag-from-delayed-67c7a9690149de9743ed970f873fa1d6', 283)\xa2op\xabdelete-data\x86\xa8priority\x93\x00\x01\xcc\xce\xa6nbytes\x81\xd9:('bag-from-delayed-67c7a9690149de9743ed970f873fa1d6', 283)\xce\x00 \x86p\xa8duration\xcb@\x18\x16m\x88xX\x00\xa7who_has\x81\xd9:('bag-from-delayed-67c7a9690149de9743ed970f873fa1d6', 283)\x91\xb5tcp://127.0.0.1:38623\xa2op\xaccompute-task\xa3key\xd9K('pluck-map-process_features_sparse-d94d304dc59efb780c39bfb0ca4df37f', 283)", b'\x83\xabbytestrings\x90\xa7headers\x81\x92\x01\xa4task\x83\xabcompression\x91\xc0\xa5count\x01\xa7lengths\x91\x02\xa4keys\x91\x92\x01\xa4task', b'\x80\x04']
distributed.worker - INFO - Connection to scheduler broken. Reregistering
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:       tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------

This is always a EOFError: Ran out of input error with differently-sized buffers (sometimes as small as several bytes), with the entire cluster running on a single machine.

Ideally I'd like a solution to the actual problem but ways to investigate the issue and understanding what could possibly go wrong will also be appreciated. Right now I'm kinda stuck without knowing how to fix the issue at hand.

Running client.get_versions(check=True) finishes without an error and this persists after updating all packages (namely numpy, scipy, dask, dask-distributed, cloudpickle)

Upvotes: 1

Views: 1817

Answers (1)

Stuart Berg
Stuart Berg

Reputation: 18141

The cloudpickle project (which dask uses) was recently patched to fix a problem that might be causing this error.

Some details are explained in this comment: https://github.com/ray-project/ray/issues/2685#issuecomment-423182347

...and more details can be found in related issues/PRs in the cloudpickle github repo.

FWIW, I encountered this error today (including the b'\x80\x04' part), and updating cloudpickle to 0.8.0 seems to have fixed it.

Upvotes: 1

Related Questions