Reputation: 4005
I am loosing index column after I use pivot_table for Dask Dataframe and save data to Parquet file.
import dask.dataframe as dd
import pandas as pd
df=pd.DataFrame()
df["Index"]=[1,2,3,1,2,3]
df["Field"]=["A","A","A","B","B","B"]
df["Value"]=[10,20,30,100,120,130]
df
My dataframe:
Index Field Value
0 1 A 10
1 2 A 20
2 3 A 30
3 1 B 100
4 2 B 120
5 3 B 130
Dask code:
ddf=dd.from_pandas(df,2)
ddf=ddf.categorize("Field")
ddf=ddf.pivot_table(values="Value", index="Index", columns="Field")
dd.to_parquet("1.parq",ddf)
dd.read_parquet("1.parq").compute()
This gives an error:
ValueError: Multiple possible indexes exist: ['A', 'B']. Please select one with index='index-name'
I can select A or B as index, but I am missing the Index column.
I tried dd.to_parquet("1.parq",ddf, write_index=True)
, but it gives me the following error:
TypeError: cannot insert an item into a CategoricalIndex that is not already an existing category
Can someone help me save the table with the column "Index" into the Parquet file?
PS:
ddf.pivot_table(values="Value", index="Index", columns="Field").compute()
gives result as expected:
Field A B
Index
1 10.0 100.0
2 20.0 120.0
3 30.0 130.0
And using Pandas is not a solution, because my data is 20 GB.
EDIT:
I tried
ddf.columns = list(ddf.columns)
dd.to_parquet("1.parq",ddf, write_index=True)
And it gives me a new error:
dask.async.TypeError: expected list of bytes
Google shows that those kind of errors arise from Tornado asynchronous library.
Upvotes: 4
Views: 4355
Reputation: 28673
There are two problems here:
pivot_table
produces a column index which is categorical, because you made the original column "Field" categorical. Writing the index to parquet calls reset_index on the data-frame, and pandas cannot add a new value to the columns index, because it is categorical. You can avoid this using ddf.columns = list(ddf.columns)
.
The index column has object dtype but actually contains integers. Integers are not one of the types expected in an object column, therefore you should convert it.
The whole block now looks like:
ddf = dd.from_pandas(df,2)
ddf = ddf.categorize("Field")
ddf = ddf.pivot_table(values="Value", index="Index", columns="Field")
ddf.columns = list(ddf.columns)
ddf = ddf.reset_index()
ddf['index'] = ddf.index.astype('int64')
dd.to_parquet("1.parq", ddf)
Upvotes: 5