cd98
cd98

Reputation: 3532

Using Dask to read parquet files from a google cloud storage

I'm trying to use Dask to read and write from a google bucket. Using a bunch of csv files works, but is inconvenient (slower, can't compress, can't have the ability to read only some columns) so I tried using the apache parquet format.

The writing seems to work fine:

import dask.dataframe as dd
pandas_df = pd.DataFrame({'x' : [2,3, 2], 'y': [1, 0, 0]})
dask_df = dd.from_pandas(pandas_df, npartitions=2)
dask_df.to_parquet("gcs://my_google_bucket/test/")

But when I try to read it back

read_again_df = dd.read_parquet("gcs://my_google_bucket/test/") 

I get a non implemented error:

AttributeError                            Traceback (most recent call last)
~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/bytes/core.py in get_pyarrow_filesystem(fs)
    520     try:
--> 521         return fs._get_pyarrow_filesystem()
    522     except AttributeError:

AttributeError: 'DaskGCSFileSystem' object has no attribute '_get_pyarrow_filesystem'

During handling of the above exception, another exception occurred:

NotImplementedError                       Traceback (most recent call last)
<ipython-input-42-ef1fc41d04d5> in <module>()
----> 1 read_again = dd.read_parquet("gcs://my_google_bucket/test/")

~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, infer_divisions)
    991 
    992     return read(fs, fs_token, paths, columns=columns, filters=filters,
--> 993                 categories=categories, index=index, infer_divisions=infer_divisions)
    994 
    995 

~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in _read_pyarrow(fs, fs_token, paths, columns, filters, categories, index, infer_divisions)
    505         columns = list(columns)
    506 
--> 507     dataset = pq.ParquetDataset(paths, filesystem=get_pyarrow_filesystem(fs))
    508     if dataset.partitions is not None:
    509         partitions = [n for n in dataset.partitions.partition_names

~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/bytes/core.py in get_pyarrow_filesystem(fs)
    522     except AttributeError:
    523         raise NotImplementedError("Using pyarrow with a %r "
--> 524                                   "filesystem object" % type(fs).__name__)

NotImplementedError: Using pyarrow with a 'DaskGCSFileSystem' filesystem object

I'm guessing this means that dask still can't read parquet files from google cloud service directly. Is there any indirect way of making this work with, say, using pyarrow?

What I want to keep is the ability of lazy loading things and then using dask to do data transformations.

Thanks!

Upvotes: 2

Views: 5635

Answers (1)

mdurant
mdurant

Reputation: 28683

Dask can certainly read parquet from GCS with the fastparquet backend (engine='fastparquet'). Note that pyarrow will not have made the _metadata file that fastparquet expects, so you can either write your data with fastparquet, create the file from the existing data files using fastparquet, or pass a glob-string pointing to all of the data files instead of the directory.

What you are doing ought to work with pyarrow too, as pyarrow can generally accept any python file-like object, but in this case appears to be trying to make a pyarrow-filesystem instead. The error you are seeing above is likely a bug and should be investigated.

-edit-

According to comments from the OP, the following does work

pandas_df = pd.DataFrame({'x' : [2,3, 2], 'y': [1, 0, 0]}) 
dask_df = dd.from_pandas(pandas_df, npartitions=2) 
dask_df.to_parquet("gcs://my_bucket/test", engine='fastparquet') 
read_again_df = dd.read_parquet("gcs://my_bucket/test/", engine='fastparquet')

Please note that, for some buggy reason, dask_df.to_parquet() needs to be called with "gcs://my_bucket/test", without the "/", otherwise the dd.read_parquet() doesn't work

Upvotes: 5

Related Questions