Reputation: 141
I have some code where I convert a pandas dataframe into a dask dataframe and I apply some operations on the row. The code used to work just fine but it seems to crash now due to some internal error caused by dask. Does anyone know what the issue is?
Sample example:
import dask.dataframe as dd
x = pd.DataFrame(np.ones((4, 2)),columns=['a', 'b'])
df = dd.from_pandas(x, npartitions=2)
df.compute()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-29-2d44a675e56f> in <module>
3 x = pd.DataFrame(np.ones((4, 2)),columns=['a', 'b'])
4 df = dd.from_pandas(x, npartitions=2)
----> 5 df.compute()
~/miniconda3/envs/research_env/lib/python3.9/site-packages/dask/base.py in compute(self, **kwargs)
282 dask.base.compute
283 """
--> 284 (result,) = compute(self, traverse=False, **kwargs)
285 return result
286
~/miniconda3/envs/research_env/lib/python3.9/site-packages/dask/base.py in compute(*args, **kwargs)
564 postcomputes.append(x.__dask_postcompute__())
565
--> 566 results = schedule(dsk, keys, **kwargs)
567 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
568
~/miniconda3/envs/research_env/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2652 Client.compute : Compute asynchronous collections
2653 """
-> 2654 futures = self._graph_to_futures(
2655 dsk,
2656 keys=set(flatten([keys])),
~/miniconda3/envs/research_env/lib/python3.9/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
2579 # Pack the high level graph before sending it to the scheduler
2580 keyset = set(keys)
-> 2581 dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
2582
2583 # Create futures before sending graph (helps avoid contention)
TypeError: __dask_distributed_pack__() takes 3 positional arguments but 4 were given
Upvotes: 3
Views: 1226
Reputation: 938
I had this error after creating a new Python env and installed dask as conda install dask
. Then I uninstalled it, and reinstalled as conda install dask distributed -c conda-forge
(as mentioned in the dask doc https://distributed.dask.org/en/latest/install.html). After that, the error disappeared.
Upvotes: 0
Reputation: 21
I had the same thing happen with a conda env I created, recently.
I discovered it was created with distributed=2021.5.0
and dask=2021.4.0
.
I downgraded distributed
, while in the env.
conda install distributed=2021.4.0
No more error. :)
Upvotes: 2
Reputation: 2462
As @SultanOrazbayev mentioned, the distributed
is not updated. Running the following line after installing all packages resolves the issue.
python -m pip install "dask[distributed]" --upgrade
Upvotes: 2
Reputation: 16551
The code works fine on my machine (MacOS Big Sur) with:
python=3.8.10
pandas=1.2.4
dask=2021.5.0
distributed=2021.5.0
numpy=1.20.3
Here's the exact code I ran on my machine:
import pandas as pd
import numpy as np
import dask.dataframe as dd
x = pd.DataFrame(np.ones((4, 2)),columns=['a', 'b'])
df = dd.from_pandas(x, npartitions=2)
df.compute()
# a b
# 0 1.0 1.0
# 1 1.0 1.0
# 2 1.0 1.0
# 3 1.0 1.0
Perhaps re-installing the modules will help. Sometimes it helps to re-create the environment from scratch, because distributed
(sometimes) is not updated at the same time.
Upvotes: 2