Reputation: 143
I am using dask to write and read parquet. I am writing using fastparquet engine and reading using pyarrow engine. My worker has 1 gb of memory. With fastparquet the memory usage is fine, but when i switch to pyarrow, it just blows up and causes the worker to restart. I have a reproducible example below which fails with pyarrow on a worker of 1gb memory limit. In reality my dataset is much more bigger than this. The only reason of using pyarrow is it gives me speed boost while scanning compared to fastparquet(somewhere around 7x-8x)
dask : 0.17.1
pyarrow : 0.9.0.post1
fastparquet : 0.1.3
import dask.dataframe as dd
import numpy as np
import pandas as pd
size = 9900000
tmpdir = '/tmp/test/outputParquet1'
d = {'a': np.random.normal(0, 0.3, size=size).cumsum() + 50,
'b': np.random.choice(['A', 'B', 'C'], size=size),
'c': np.random.choice(['D', 'E', 'F'], size=size),
'd': np.random.normal(0, 0.4, size=size).cumsum() + 50,
'e': np.random.normal(0, 0.5, size=size).cumsum() + 50,
'f': np.random.normal(0, 0.6, size=size).cumsum() + 50,
'g': np.random.normal(0, 0.7, size=size).cumsum() + 50}
df = dd.from_pandas(pd.DataFrame(d), 200)
df.to_parquet(tmpdir, compression='snappy', write_index=True,
engine='fastparquet')
#engine = 'pyarrow' #fails due to worker restart
engine = 'fastparquet' #works fine
df_partitioned = dd.read_parquet(tmpdir + "/*.parquet", engine=engine)
print(df_partitioned.count().compute())
df_partitioned.query("b=='A'").count().compute()
Edit: My original setup has spark jobs running that writes data parallely into partitions using fastparquet. So the metadata file is created in the innermost partition rather than the parent directory.Hence using glob paths instead of parent directory(fastparquet is much faster with parent directory read whereas pyarrow wins when scanning with glob path)
Upvotes: 4
Views: 4271
Reputation: 428
You could also filter as you load the data.e.g by a specific column
df = dd.read_parquet('/path/to/*.parquet', engine='fastparquet', filters=[(COLUMN, 'operation', 'SOME_VALUE')])
.
Imagine operations like ==
, >
, <
, and so on.
Upvotes: 0
Reputation: 28684
Some timing results on my non memory-restricted system
With your example data
In [17]: df_partitioned = dd.read_parquet(tmpdir, engine='fastparquet')
In [18]: %timeit df_partitioned.count().compute()
2.47 s ± 114 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [19]: df_partitioned = dd.read_parquet(tmpdir, engine='pyarrow')
In [20]: %timeit df_partitioned.count().compute()
1.93 s ± 96.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
With columns b
and c
converted to categorical before writing
In [30]: df_partitioned = dd.read_parquet(tmpdir, engine='fastparquet')
In [31]: %timeit df_partitioned.count().compute()
1.25 s ± 83.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [32]: df_partitioned = dd.read_parquet(tmpdir, engine='pyarrow')
In [33]: %timeit df_partitioned.count().compute()
1.82 s ± 63 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
With fastparquet direct, single-threaded
In [36]: %timeit fastparquet.ParquetFile(tmpdir).to_pandas().count()
1.82 s ± 19 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
With 20 partitions instead of 200 (fastparquet, categories)
In [42]: %timeit df_partitioned.count().compute()
863 ms ± 78.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Upvotes: 1
Reputation: 57301
I recommend selecting the columns you need in the read_parquet
call
df = dd.read_parquet('/path/to/*.parquet', engine='pyarrow', columns=['b'])
This will allow you to efficiently read only a few columns that you need rather than all of the columns at once.
Upvotes: 0