codingknob
codingknob

Reputation: 11660

dask.async.MemoryError while running big data computation on EC2

I have a m4.4xlarge (64 GB ram) EC2 box. I'm running dask with pandas. I get the following memory error.

I get this after about 24 hours of run, which is approximiately the time it should take for the task to complete so I'm not sure if the error was due to insufficient RAM, disk memory as the end of the script I perform a DF.to_csv() to write the large DF to disk or a pandas/numpy internal memory limitation?

raise(remote_exception(res, tb))
    dask.async.MemoryError: 

Traceback
---------
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/dask/async.py", line 267, in execute_task
    result = _execute_task(task, data)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/dask/async.py", line 248, in _execute_task
    args2 = [_execute_task(a, cache) for a in args]
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/dask/async.py", line 249, in _execute_task
    return func(*args2)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/frame.py", line 4061, in apply
    return self._apply_standard(f, axis, reduce=reduce)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/frame.py", line 4179, in _apply_standard
    result = result._convert(datetime=True, timedelta=True, copy=False)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/generic.py", line 3004, in _convert
    copy=copy)).__finalize__(self)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 2941, in convert
    return self.apply('convert', **kwargs)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 2901, in apply
    bm._consolidate_inplace()
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3278, in _consolidate_inplace
    self.blocks = tuple(_consolidate(self.blocks))
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 4269, in _consolidate
    _can_consolidate=_can_consolidate)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 4289, in _merge_blocks
    new_values = _vstack([b.values for b in blocks], dtype)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 4335, in _vstack
    return np.vstack(to_stack)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/numpy/core/shape_base.py", line 230, in vstack
    return _nx.concatenate([atleast_2d(_m) for _m in tup], 0)

Update:

So some additional information in light of MRocklin's answer.

Here is how I execute the process:

def dask_stats_calc(dfpath,v1,v2,v3...):
    dfpath_ddf = dd.from_pandas(dfpath,npartitions=16,sort=False)
    return dfpath_ddf.apply(calculate_stats,axis=1,args=(dfdaily,v1,v2,v3...)).compute(get=get).stack().reset_index(drop=True)

f_threaded = partial(dask_stats_calc,dfpath,v1,v2,v3...,multiprocessing.get)
f_threaded()

Now the thing is dfpath is a df with 1.4 million rows and so dfpath_ddf.apply() runs over 1.4 million rows.

Once the entire dfpath_ddf.apply() is complete a df.to_csv() occurs but like you said its better to periodically write to disk.

Now the question is, how do I implement something like a periodic write to disk for say every 200k rows? I guess I can breakup dfpath_ddf into 200k chunks (or something similar) and run each sequentially?

Upvotes: 1

Views: 480

Answers (1)

MRocklin
MRocklin

Reputation: 57251

Single threaded execution

Sometimes tasks build up in RAM while waiting to write to a single file on disk. Using a sequential output like this is inherently tricky with parallel systems. If you need to use a single file then I recommend trying the same computation single threaded to see if it makes a difference.

with dask.set_options(get=dask.async.get_sync):
    DF.to_csv('out.csv')

Write to multiple files

Alternatively (and far preferred) you could try writing out to many CSV files. This is much easier on the scheduling because tasks don't have to wait until their predecessors complete in order to write to disk and delete themselves from RAM.

DF.to_csv('out.*.csv')

Example

So a common and fairly robust way to execute and write in parallel would be to combine your computations and a call to to_csv at the end

ddf = dd.from_pandas(df, npartitions=100)
ddf.apply(myfunc).to_csv('out.*.csv')

This will break up your dataframe into chunks, call your function on each chunk, write that chunk to disk, and then delete the intermediate value, freeing up space.

Upvotes: 1

Related Questions