Reputation: 2434
I created a parquet dataset by reading data into a pandas df, using get_dummies()
on the data, and writing it to a parquet file:
df = pd.read_sql(query, engine)
encoded = pd.get_dummies(df, columns=['account'])
encoded.to_parquet('encoded.parquet')
The pandas df was 2.7M rows and 4,000 columns. Next, I read the parquet data into a dask df and attempted to perform a groupby on it:
c = Client()
df = dd.read_parquet('encoded.parquet')
result = c.compute(df.groupby(df.journal_entry).max())
The resulting df is 600,000 rows and 4,000 columns. I have 32 GB of RAM on my machine. After a small amount of time of attempted calculation, a memory error results. This is the traceback:
---------------------------------------------------------------------------
MemoryError Traceback (most recent call last)
<ipython-input-29-083067d43616> in <module>()
----> 1 result.result()
~\_installed\anaconda\lib\site-packages\distributed\client.py in result(self, timeout)
156 self._result, raiseit=False, callback_timeout=timeout)
157 if self.status == 'error':
--> 158 six.reraise(*result)
159 elif self.status == 'cancelled':
160 raise result
~\_installed\anaconda\lib\site-packages\six.py in reraise(tp, value, tb)
683 value = tp()
684 if value.__traceback__ is not tb:
--> 685 raise value.with_traceback(tb)
686 raise value
687
~\_installed\anaconda\lib\site-packages\zict\buffer.py in __setitem__()
80 self.fast[key] = value
81 else:
---> 82 self.slow[key] = value
83
84 def __delitem__(self, key):
~\_installed\anaconda\lib\site-packages\zict\func.py in __setitem__()
40
41 def __setitem__(self, key, value):
---> 42 self.d[key] = self.dump(value)
43
44 def __contains__(self, key):
~\_installed\anaconda\lib\site-packages\distributed\protocol\serialize.py in serialize_bytelist()
342
343 def serialize_bytelist(x):
--> 344 header, frames = serialize(x)
345 frames = frame_split_size(frames)
346 if frames:
~\_installed\anaconda\lib\site-packages\distributed\protocol\serialize.py in serialize()
136 if _find_lazy_registration(name):
137 return serialize(x) # recurse
--> 138 header, frames = {}, [pickle.dumps(x)]
139
140 return header, frames
~\_installed\anaconda\lib\site-packages\distributed\protocol\pickle.py in dumps()
49 except Exception:
50 try:
---> 51 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
52 except Exception as e:
53 logger.info("Failed to serialize %s. Exception: %s", x, e)
~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in dumps()
898
899 cp = CloudPickler(file,protocol)
--> 900 cp.dump(obj)
901
902 return file.getvalue()
~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in dump()
232 self.inject_addons()
233 try:
--> 234 return Pickler.dump(self, obj)
235 except RuntimeError as e:
236 if 'recursion' in e.args[0]:
~\_installed\anaconda\lib\pickle.py in dump()
407 if self.proto >= 4:
408 self.framer.start_framing()
--> 409 self.save(obj)
410 self.write(STOP)
411 self.framer.end_framing()
~\_installed\anaconda\lib\pickle.py in save()
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in save_reduce()
784
785 if state is not None:
--> 786 save(state)
787 write(pickle.BUILD)
788
~\_installed\anaconda\lib\pickle.py in save()
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~\_installed\anaconda\lib\pickle.py in save_dict()
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
~\_installed\anaconda\lib\pickle.py in _batch_setitems()
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:
~\_installed\anaconda\lib\pickle.py in save()
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in save_reduce()
784
785 if state is not None:
--> 786 save(state)
787 write(pickle.BUILD)
788
~\_installed\anaconda\lib\pickle.py in save()
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~\_installed\anaconda\lib\pickle.py in save_tuple()
749 write(MARK)
750 for element in obj:
--> 751 save(element)
752
753 if id(obj) in memo:
~\_installed\anaconda\lib\pickle.py in save()
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~\_installed\anaconda\lib\pickle.py in save_list()
779
780 self.memoize(obj)
--> 781 self._batch_appends(obj)
782
783 dispatch[list] = save_list
~\_installed\anaconda\lib\pickle.py in _batch_appends()
803 write(MARK)
804 for x in tmp:
--> 805 save(x)
806 write(APPENDS)
807 elif n:
~\_installed\anaconda\lib\pickle.py in save()
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
~\_installed\anaconda\lib\site-packages\cloudpickle\cloudpickle.py in save_reduce()
784
785 if state is not None:
--> 786 save(state)
787 write(pickle.BUILD)
788
~\_installed\anaconda\lib\pickle.py in save()
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~\_installed\anaconda\lib\pickle.py in save_tuple()
749 write(MARK)
750 for element in obj:
--> 751 save(element)
752
753 if id(obj) in memo:
~\_installed\anaconda\lib\pickle.py in save()
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~\_installed\anaconda\lib\pickle.py in save_bytes()
697 self.write(SHORT_BINBYTES + pack("<B", n) + obj)
698 elif n > 0xffffffff and self.proto >= 4:
--> 699 self.write(BINBYTES8 + pack("<Q", n) + obj)
700 else:
701 self.write(BINBYTES + pack("<I", n) + obj)
MemoryError:
In addition to this method, I tried skipping the parquet steps and directly converting the pandas df to a dask df. This did not run into an immediate memory issue, but it ran for an hour with 0% completed (according to dask diagnostics). What can I do to avoid this memory error?
Upvotes: 0
Views: 696
Reputation: 28673
Dask needs to be able to load whole partitions into memory, and also needs temporary memory for the group-aggregate operations. In your case, the data has been written into a single partition.
Assuming you are writing your data with fastparquet
, you can tell pandas to write your data into partitions:
encoded.to_parquet('encoded.parquet', row_group_offsets=200000)
with pyarrow
, the syntax is a little more cumbersome:
with pyarrow.parquet.ParquetWriter('encoded.parquet',
pyarrow.Table.from_pandas(encoded).schema) as writer:
for i in range(0, 47000000, 200000):
writer.write_table(pyarrow.Table.from_pandas(df[i:i+200000]))
The right number to use will depend on the number of dask workers you have.
This is important, because your column count is rather large, and the default partitioning (in fastparquet
- there is none at all in pyarrow
) is done by number of rows.
Upvotes: 1