Patrick
Patrick

Reputation: 141

TypeError: __dask_distributed_pack__() takes 3 positional arguments but 4 were given

I have some code where I convert a pandas dataframe into a dask dataframe and I apply some operations on the row. The code used to work just fine but it seems to crash now due to some internal error caused by dask. Does anyone know what the issue is?

Sample example:

import dask.dataframe as dd
x = pd.DataFrame(np.ones((4, 2)),columns=['a', 'b'])
df = dd.from_pandas(x, npartitions=2)
df.compute()

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-29-2d44a675e56f> in <module>
      3 x = pd.DataFrame(np.ones((4, 2)),columns=['a', 'b'])
      4 df = dd.from_pandas(x, npartitions=2)
----> 5 df.compute()

~/miniconda3/envs/research_env/lib/python3.9/site-packages/dask/base.py in compute(self, **kwargs)
    282         dask.base.compute
    283         """
--> 284         (result,) = compute(self, traverse=False, **kwargs)
    285         return result
    286 

~/miniconda3/envs/research_env/lib/python3.9/site-packages/dask/base.py in compute(*args, **kwargs)
    564         postcomputes.append(x.__dask_postcompute__())
    565 
--> 566     results = schedule(dsk, keys, **kwargs)
    567     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    568 

~/miniconda3/envs/research_env/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2652         Client.compute : Compute asynchronous collections
   2653         """
-> 2654         futures = self._graph_to_futures(
   2655             dsk,
   2656             keys=set(flatten([keys])),

~/miniconda3/envs/research_env/lib/python3.9/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
   2579             # Pack the high level graph before sending it to the scheduler
   2580             keyset = set(keys)
-> 2581             dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
   2582 
   2583             # Create futures before sending graph (helps avoid contention)

TypeError: __dask_distributed_pack__() takes 3 positional arguments but 4 were given

Upvotes: 3

Views: 1226

Answers (4)

Pan
Pan

Reputation: 938

I had this error after creating a new Python env and installed dask as conda install dask. Then I uninstalled it, and reinstalled as conda install dask distributed -c conda-forge (as mentioned in the dask doc https://distributed.dask.org/en/latest/install.html). After that, the error disappeared.

Upvotes: 0

Dr Detroit
Dr Detroit

Reputation: 21

I had the same thing happen with a conda env I created, recently. I discovered it was created with distributed=2021.5.0 and dask=2021.4.0. I downgraded distributed, while in the env.

conda install distributed=2021.4.0

No more error. :)

Upvotes: 2

Naeem Khoshnevis
Naeem Khoshnevis

Reputation: 2462

As @SultanOrazbayev mentioned, the distributed is not updated. Running the following line after installing all packages resolves the issue.

python -m pip install "dask[distributed]" --upgrade

Upvotes: 2

SultanOrazbayev
SultanOrazbayev

Reputation: 16551

The code works fine on my machine (MacOS Big Sur) with:

python=3.8.10
pandas=1.2.4
dask=2021.5.0
distributed=2021.5.0
numpy=1.20.3

Here's the exact code I ran on my machine:

import pandas as pd
import numpy as np
import dask.dataframe as dd
x = pd.DataFrame(np.ones((4, 2)),columns=['a', 'b'])
df = dd.from_pandas(x, npartitions=2)
df.compute()
#      a    b
# 0  1.0  1.0
# 1  1.0  1.0
# 2  1.0  1.0
# 3  1.0  1.0

Perhaps re-installing the modules will help. Sometimes it helps to re-create the environment from scratch, because distributed (sometimes) is not updated at the same time.

Upvotes: 2

Related Questions