Reputation: 11660
I have a m4.4xlarge (64 GB ram) EC2 box. I'm running dask with pandas. I get the following memory error.
I get this after about 24 hours of run, which is approximiately the time it should take for the task to complete so I'm not sure if the error was due to insufficient RAM, disk memory as the end of the script I perform a DF.to_csv() to write the large DF to disk or a pandas/numpy internal memory limitation?
raise(remote_exception(res, tb))
dask.async.MemoryError:
Traceback
---------
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/dask/async.py", line 267, in execute_task
result = _execute_task(task, data)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/dask/async.py", line 248, in _execute_task
args2 = [_execute_task(a, cache) for a in args]
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/dask/async.py", line 249, in _execute_task
return func(*args2)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/frame.py", line 4061, in apply
return self._apply_standard(f, axis, reduce=reduce)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/frame.py", line 4179, in _apply_standard
result = result._convert(datetime=True, timedelta=True, copy=False)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/generic.py", line 3004, in _convert
copy=copy)).__finalize__(self)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 2941, in convert
return self.apply('convert', **kwargs)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 2901, in apply
bm._consolidate_inplace()
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3278, in _consolidate_inplace
self.blocks = tuple(_consolidate(self.blocks))
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 4269, in _consolidate
_can_consolidate=_can_consolidate)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 4289, in _merge_blocks
new_values = _vstack([b.values for b in blocks], dtype)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 4335, in _vstack
return np.vstack(to_stack)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/numpy/core/shape_base.py", line 230, in vstack
return _nx.concatenate([atleast_2d(_m) for _m in tup], 0)
Update:
So some additional information in light of MRocklin's answer.
Here is how I execute the process:
def dask_stats_calc(dfpath,v1,v2,v3...):
dfpath_ddf = dd.from_pandas(dfpath,npartitions=16,sort=False)
return dfpath_ddf.apply(calculate_stats,axis=1,args=(dfdaily,v1,v2,v3...)).compute(get=get).stack().reset_index(drop=True)
f_threaded = partial(dask_stats_calc,dfpath,v1,v2,v3...,multiprocessing.get)
f_threaded()
Now the thing is dfpath
is a df with 1.4 million rows and so dfpath_ddf.apply()
runs over 1.4 million rows.
Once the entire dfpath_ddf.apply()
is complete a df.to_csv()
occurs but like you said its better to periodically write to disk.
Now the question is, how do I implement something like a periodic write to disk for say every 200k rows? I guess I can breakup dfpath_ddf
into 200k chunks (or something similar) and run each sequentially?
Upvotes: 1
Views: 480
Reputation: 57251
Sometimes tasks build up in RAM while waiting to write to a single file on disk. Using a sequential output like this is inherently tricky with parallel systems. If you need to use a single file then I recommend trying the same computation single threaded to see if it makes a difference.
with dask.set_options(get=dask.async.get_sync):
DF.to_csv('out.csv')
Alternatively (and far preferred) you could try writing out to many CSV files. This is much easier on the scheduling because tasks don't have to wait until their predecessors complete in order to write to disk and delete themselves from RAM.
DF.to_csv('out.*.csv')
So a common and fairly robust way to execute and write in parallel would be to combine your computations and a call to to_csv
at the end
ddf = dd.from_pandas(df, npartitions=100)
ddf.apply(myfunc).to_csv('out.*.csv')
This will break up your dataframe into chunks, call your function on each chunk, write that chunk to disk, and then delete the intermediate value, freeing up space.
Upvotes: 1