OverflowingTheGlass
OverflowingTheGlass

Reputation: 2434

Dask Memory Error Grouping DF From Parquet Data

I created a parquet dataset by reading data into a pandas df, using get_dummies() on the data, and writing it to a parquet file:

df = pd.read_sql(query, engine)
encoded = pd.get_dummies(df, columns=['account'])
encoded.to_parquet('encoded.parquet')

The pandas df was 2.7M rows and 4,000 columns. Next, I read the parquet data into a dask df and attempted to perform a groupby on it:

c = Client()
df = dd.read_parquet('encoded.parquet')
result = c.compute(df.groupby(df.journal_entry).max())

The resulting df is 600,000 rows and 4,000 columns. I have 32 GB of RAM on my machine. After a small amount of time of attempted calculation, a memory error results. This is the traceback:

---------------------------------------------------------------------------
MemoryError                               Traceback (most recent call last)
<ipython-input-29-083067d43616> in <module>()
----> 1 result.result()

~\_installed\anaconda\lib\site-packages\distributed\client.py in result(self, timeout)
    156                       self._result, raiseit=False, callback_timeout=timeout)
    157         if self.status == 'error':
--> 158             six.reraise(*result)
    159         elif self.status == 'cancelled':
    160             raise result

~\_installed\anaconda\lib\site-packages\six.py in reraise(tp, value, tb)
    683             value = tp()
    684         if value.__traceback__ is not tb:
--> 685             raise value.with_traceback(tb)
    686         raise value
    687 

~\_installed\anaconda\lib\site-packages\zict\buffer.py in __setitem__()
     80             self.fast[key] = value
     81         else:
---> 82             self.slow[key] = value
     83 
     84     def __delitem__(self, key):

~\_installed\anaconda\lib\site-packages\zict\func.py in __setitem__()
     40 
     41     def __setitem__(self, key, value):
---> 42         self.d[key] = self.dump(value)
     43 
     44     def __contains__(self, key):

~\_installed\anaconda\lib\site-packages\distributed\protocol\serialize.py in serialize_bytelist()
    342 
    343 def serialize_bytelist(x):
--> 344     header, frames = serialize(x)
    345     frames = frame_split_size(frames)
    346     if frames:

~\_installed\anaconda\lib\site-packages\distributed\protocol\serialize.py in serialize()
    136         if _find_lazy_registration(name):
    137             return serialize(x)  # recurse
--> 138         header, frames = {}, [pickle.dumps(x)]
    139 
    140     return header, frames

~\_installed\anaconda\lib\site-packages\distributed\protocol\pickle.py in dumps()
     49     except Exception:
     50         try:
---> 51             return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     52         except Exception as e:
     53             logger.info("Failed to serialize %s. Exception: %s", x, e)

~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in dumps()
    898 
    899     cp = CloudPickler(file,protocol)
--> 900     cp.dump(obj)
    901 
    902     return file.getvalue()

~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in dump()
    232         self.inject_addons()
    233         try:
--> 234             return Pickler.dump(self, obj)
    235         except RuntimeError as e:
    236             if 'recursion' in e.args[0]:

~\_installed\anaconda\lib\pickle.py in dump()
    407         if self.proto >= 4:
    408             self.framer.start_framing()
--> 409         self.save(obj)
    410         self.write(STOP)
    411         self.framer.end_framing()

~\_installed\anaconda\lib\pickle.py in save()
    519 
    520         # Save the reduce() output and finally memoize the object
--> 521         self.save_reduce(obj=obj, *rv)
    522 
    523     def persistent_id(self, obj):

~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in save_reduce()
    784 
    785         if state is not None:
--> 786             save(state)
    787             write(pickle.BUILD)
    788 

~\_installed\anaconda\lib\pickle.py in save()
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

~\_installed\anaconda\lib\pickle.py in save_dict()
    819 
    820         self.memoize(obj)
--> 821         self._batch_setitems(obj.items())
    822 
    823     dispatch[dict] = save_dict

~\_installed\anaconda\lib\pickle.py in _batch_setitems()
    845                 for k, v in tmp:
    846                     save(k)
--> 847                     save(v)
    848                 write(SETITEMS)
    849             elif n:

~\_installed\anaconda\lib\pickle.py in save()
    519 
    520         # Save the reduce() output and finally memoize the object
--> 521         self.save_reduce(obj=obj, *rv)
    522 
    523     def persistent_id(self, obj):

~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in save_reduce()
    784 
    785         if state is not None:
--> 786             save(state)
    787             write(pickle.BUILD)
    788 

~\_installed\anaconda\lib\pickle.py in save()
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

~\_installed\anaconda\lib\pickle.py in save_tuple()
    749         write(MARK)
    750         for element in obj:
--> 751             save(element)
    752 
    753         if id(obj) in memo:

~\_installed\anaconda\lib\pickle.py in save()
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

~\_installed\anaconda\lib\pickle.py in save_list()
    779 
    780         self.memoize(obj)
--> 781         self._batch_appends(obj)
    782 
    783     dispatch[list] = save_list

~\_installed\anaconda\lib\pickle.py in _batch_appends()
    803                 write(MARK)
    804                 for x in tmp:
--> 805                     save(x)
    806                 write(APPENDS)
    807             elif n:

~\_installed\anaconda\lib\pickle.py in save()
    519 
    520         # Save the reduce() output and finally memoize the object
--> 521         self.save_reduce(obj=obj, *rv)
    522 
    523     def persistent_id(self, obj):

~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in save_reduce()
    784 
    785         if state is not None:
--> 786             save(state)
    787             write(pickle.BUILD)
    788 

~\_installed\anaconda\lib\pickle.py in save()
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

~\_installed\anaconda\lib\pickle.py in save_tuple()
    749         write(MARK)
    750         for element in obj:
--> 751             save(element)
    752 
    753         if id(obj) in memo:

~\_installed\anaconda\lib\pickle.py in save()
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

~\_installed\anaconda\lib\pickle.py in save_bytes()
    697             self.write(SHORT_BINBYTES + pack("<B", n) + obj)
    698         elif n > 0xffffffff and self.proto >= 4:
--> 699             self.write(BINBYTES8 + pack("<Q", n) + obj)
    700         else:
    701             self.write(BINBYTES + pack("<I", n) + obj)

MemoryError: 

In addition to this method, I tried skipping the parquet steps and directly converting the pandas df to a dask df. This did not run into an immediate memory issue, but it ran for an hour with 0% completed (according to dask diagnostics). What can I do to avoid this memory error?

Upvotes: 0

Views: 696

Answers (1)

mdurant
mdurant

Reputation: 28673

Dask needs to be able to load whole partitions into memory, and also needs temporary memory for the group-aggregate operations. In your case, the data has been written into a single partition.

Assuming you are writing your data with fastparquet, you can tell pandas to write your data into partitions:

encoded.to_parquet('encoded.parquet', row_group_offsets=200000)

with pyarrow, the syntax is a little more cumbersome:

with pyarrow.parquet.ParquetWriter('encoded.parquet', 
        pyarrow.Table.from_pandas(encoded).schema) as writer:
    for i in range(0, 47000000, 200000):
        writer.write_table(pyarrow.Table.from_pandas(df[i:i+200000]))

The right number to use will depend on the number of dask workers you have.

This is important, because your column count is rather large, and the default partitioning (in fastparquet - there is none at all in pyarrow) is done by number of rows.

Upvotes: 1

Related Questions