Reputation: 3532
I'm trying to use Dask to read and write from a google bucket.
Using a bunch of csv
files works, but is inconvenient (slower, can't compress, can't have the ability to read only some columns) so I tried using the apache parquet
format.
The writing seems to work fine:
import dask.dataframe as dd
pandas_df = pd.DataFrame({'x' : [2,3, 2], 'y': [1, 0, 0]})
dask_df = dd.from_pandas(pandas_df, npartitions=2)
dask_df.to_parquet("gcs://my_google_bucket/test/")
But when I try to read it back
read_again_df = dd.read_parquet("gcs://my_google_bucket/test/")
I get a non implemented error:
AttributeError Traceback (most recent call last)
~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/bytes/core.py in get_pyarrow_filesystem(fs)
520 try:
--> 521 return fs._get_pyarrow_filesystem()
522 except AttributeError:
AttributeError: 'DaskGCSFileSystem' object has no attribute '_get_pyarrow_filesystem'
During handling of the above exception, another exception occurred:
NotImplementedError Traceback (most recent call last)
<ipython-input-42-ef1fc41d04d5> in <module>()
----> 1 read_again = dd.read_parquet("gcs://my_google_bucket/test/")
~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, infer_divisions)
991
992 return read(fs, fs_token, paths, columns=columns, filters=filters,
--> 993 categories=categories, index=index, infer_divisions=infer_divisions)
994
995
~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in _read_pyarrow(fs, fs_token, paths, columns, filters, categories, index, infer_divisions)
505 columns = list(columns)
506
--> 507 dataset = pq.ParquetDataset(paths, filesystem=get_pyarrow_filesystem(fs))
508 if dataset.partitions is not None:
509 partitions = [n for n in dataset.partitions.partition_names
~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/bytes/core.py in get_pyarrow_filesystem(fs)
522 except AttributeError:
523 raise NotImplementedError("Using pyarrow with a %r "
--> 524 "filesystem object" % type(fs).__name__)
NotImplementedError: Using pyarrow with a 'DaskGCSFileSystem' filesystem object
I'm guessing this means that dask
still can't read parquet files from google cloud service directly.
Is there any indirect way of making this work with, say, using pyarrow
?
What I want to keep is the ability of lazy loading things and then using dask
to do data transformations.
Thanks!
Upvotes: 2
Views: 5635
Reputation: 28683
Dask can certainly read parquet from GCS with the fastparquet backend (engine='fastparquet'
). Note that pyarrow will not have made the _metadata
file that fastparquet expects, so you can either write your data with fastparquet, create the file from the existing data files using fastparquet, or pass a glob-string pointing to all of the data files instead of the directory.
What you are doing ought to work with pyarrow too, as pyarrow can generally accept any python file-like object, but in this case appears to be trying to make a pyarrow-filesystem instead. The error you are seeing above is likely a bug and should be investigated.
-edit-
According to comments from the OP, the following does work
pandas_df = pd.DataFrame({'x' : [2,3, 2], 'y': [1, 0, 0]})
dask_df = dd.from_pandas(pandas_df, npartitions=2)
dask_df.to_parquet("gcs://my_bucket/test", engine='fastparquet')
read_again_df = dd.read_parquet("gcs://my_bucket/test/", engine='fastparquet')
Please note that, for some buggy reason, dask_df.to_parquet()
needs to be called with "gcs://my_bucket/test", without the "/", otherwise the dd.read_parquet()
doesn't work
Upvotes: 5