pierre_j
pierre_j

Reputation: 983

Writing a dask dataframe to parquet: 'TypeError'

I am trying to use Dask to write parquet files. Target is to use its repartition feature, but it appears I am not able to write out a simple parquet file, without coming to the repartitionstep...

Here is the code I use to create a parquet file from pyarrow, read it back by dask, then write it again.

import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import dask.dataframe as dd

file = 'example.parquet'
file_res = 'example_res.parquet'

# Generate a random df
df = pd.DataFrame(np.random.randint(100,size=(100000, 20)),columns=['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T'])

# Write 1st parquet file with pyarrow
table = pa.Table.from_pandas(df)
pq.write_table(table, file, version='1.0')

# Read it back with Dask, and write it again
dd_df = dd.read_parquet(file)
dd_df.to_parquet(file_res)

The last writing step ends then with TypeError: expected list of bytes. Full log here below:


  File "C:/Users/me/Documents/code/_draft/pyarrow_parquet_store.py", line 31, in <module>
    dd_df.to_parquet(file_res)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\dataframe\core.py", line 4075, in to_parquet
    return to_parquet(self, path, *args, **kwargs)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\dataframe\io\parquet\core.py", line 665, in to_parquet
    out = out.compute(**compute_kwargs)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\base.py", line 279, in compute
    (result,) = compute(self, traverse=False, **kwargs)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\base.py", line 567, in compute
    results = schedule(dsk, keys, **kwargs)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\threaded.py", line 84, in get
    **kwargs

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\local.py", line 486, in get_async
    raise_exception(exc, tb)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\local.py", line 316, in reraise
    raise exc

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\local.py", line 222, in execute_task
    result = _execute_task(task, data)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\utils.py", line 30, in apply
    return func(*args, **kwargs)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\dataframe\io\parquet\fastparquet.py", line 625, in write_partition
    fil, df, fmd.schema, compression=compression, fmd=fmd

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 631, in make_part_file
    rg = make_row_group(f, data, schema, compression=compression)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 619, in make_row_group
    compression=comp)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 513, in write_column
    data, selement)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 254, in encode_plain
    return pack_byte_array(list(out))

  File "fastparquet\speedups.pyx", line 112, in fastparquet.speedups.pack_byte_array

TypeError: expected list of bytes

Thanks for any help. Bests.

Upvotes: 3

Views: 2904

Answers (1)

mdurant
mdurant

Reputation: 28684

The problem appears to be the index: it is stored as pure metadata: RangeIndex(start=0, stop=100000, step=1), but Dask infers this as having "object" (i.e., string or something more complex); hence the attempt to try to write the list of number as if they were strings.

Whilst this is a bug, here are some workarounds:

  • don't write the index dd_df.to_parquet(file_res, write_index=False)
  • for a single partition like this, the fastparquet API without dask work just fine
  • drop the index or set a new index
  • set the index dtype
  • use pyarrow, engine="pyarrow"

Upvotes: 5

Related Questions