Reputation: 983
I am trying to use Dask to write parquet files. Target is to use its repartition
feature, but it appears I am not able to write out a simple parquet file, without coming to the repartition
step...
Here is the code I use to create a parquet file from pyarrow, read it back by dask, then write it again.
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import dask.dataframe as dd
file = 'example.parquet'
file_res = 'example_res.parquet'
# Generate a random df
df = pd.DataFrame(np.random.randint(100,size=(100000, 20)),columns=['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T'])
# Write 1st parquet file with pyarrow
table = pa.Table.from_pandas(df)
pq.write_table(table, file, version='1.0')
# Read it back with Dask, and write it again
dd_df = dd.read_parquet(file)
dd_df.to_parquet(file_res)
The last writing step ends then with TypeError: expected list of bytes
. Full log here below:
File "C:/Users/me/Documents/code/_draft/pyarrow_parquet_store.py", line 31, in <module>
dd_df.to_parquet(file_res)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\dataframe\core.py", line 4075, in to_parquet
return to_parquet(self, path, *args, **kwargs)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\dataframe\io\parquet\core.py", line 665, in to_parquet
out = out.compute(**compute_kwargs)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\base.py", line 279, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\base.py", line 567, in compute
results = schedule(dsk, keys, **kwargs)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\threaded.py", line 84, in get
**kwargs
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\local.py", line 486, in get_async
raise_exception(exc, tb)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\local.py", line 316, in reraise
raise exc
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\local.py", line 222, in execute_task
result = _execute_task(task, data)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\utils.py", line 30, in apply
return func(*args, **kwargs)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\dataframe\io\parquet\fastparquet.py", line 625, in write_partition
fil, df, fmd.schema, compression=compression, fmd=fmd
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 631, in make_part_file
rg = make_row_group(f, data, schema, compression=compression)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 619, in make_row_group
compression=comp)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 513, in write_column
data, selement)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 254, in encode_plain
return pack_byte_array(list(out))
File "fastparquet\speedups.pyx", line 112, in fastparquet.speedups.pack_byte_array
TypeError: expected list of bytes
Thanks for any help. Bests.
Upvotes: 3
Views: 2904
Reputation: 28684
The problem appears to be the index: it is stored as pure metadata: RangeIndex(start=0, stop=100000, step=1)
, but Dask infers this as having "object" (i.e., string or something more complex); hence the attempt to try to write the list of number as if they were strings.
Whilst this is a bug, here are some workarounds:
dd_df.to_parquet(file_res, write_index=False)
engine="pyarrow"
Upvotes: 5