Reputation: 568
I'm loading a large parquet dataframe using Dask but can't seem to be able to do anything with it without the system crashing on me or getting a million errors and no output.
The data weighs about 165M compressed, or 13G once loaded in pandas (it fits well in the 45G RAM available).
import pandas as pd
df = pd.read_parquet('data_simulated.parquet')
df.memory_usage(deep=True).sum() * 1e-9
# returns 13.09
df.head()
# prints the head of the dataframe properly
Instead, if using Dask
from dask.distributed import Client
import dask.dataframe as dataframe
client = Client()
# prints: <Client: 'tcp://127.0.0.1:38576' processes=7 threads=28, memory=48.32 GB>
df = dataframe.read_parquet('data_simulated.parquet')
df.memory_usage(deep=True).sum().compute() * 1e-9
prints
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
[a large traceback]
KilledWorker: ("('series-groupby-sum-chunk-memory_usage-dc8dab46de985e36d76a85bf3abaccbf', 0)", <Worker 'tcp://127.0.0.1:36882', name: 2, memory: 0, processing: 1>)
the same happens if I try to do df.head(), df.set_index(...) or any other operation that actually computes anything on the dataframe. I've tried reducing the number workers, such that each has more memory. I've also tried repartitioning the dataframe, but it also fails with the same error. If I set the memory_limit on the client's LocalCluster to zero, the system just fully crashes.
What am I doing wrong?
Edit: Here's some extra info on the data (gotten from loading it with Pandas)
In [2]: print(df.dtypes)
market_id uint32
choice_id uint64
attribute_1 bool
attribute_2 bool
attribute_3 bool
income float32
is_urban bool
distance float32
weight float32
quarter uint32
product_id int64
price float64
size float32
share float32
market_quarter int64
product_type object
outside_option int64
dtype: object
In [3]: print(df.shape)
(89429613, 17)
the object product_type is a string.
Upvotes: 1
Views: 1388
Reputation: 28673
Dask works by loading and processing your data chunk-wise. In the case of parquet, the origin of that chunking comes from the datafiles themselves: internally parquet is organised into "row-groups", sets of rows that are meant to be read together.
It sounds like in this case, the entire dataset consists of one row-group in one file. This means that Dask has no opportunity to split the data into chunks; you get one task, which takes the full amount of memory pressure in one worker (probably equal to the total data size plus some temporary values), which has only been allocated a faction of the total system memory. Hence the errors.
Note that you can turn off memory monitoring to prevent workers getting killed in the configuration or directly with keywords like memory_limit=0
. In this case, you know that only one worker will be doing the load.
In some very specific situations (no nesting/list/map types), it would be possible to split row-groups, but code for this does not exist, and it would be inefficient due to the compression and encoding of the data.
Upvotes: 1