ML_Passion
ML_Passion

Reputation: 1071

Error while exporting a dask dataframe to csv

My dask dataframe has about 120 mm rows and 4 columns:

df_final.dtypes

cust_id        int64
score            float64
total_qty        float64
update_score    float64
dtype: object

and I'm doing this operation on jupyter notebooks connected to linux machine :

%time df_final.to_csv('/path/claritin-files-*.csv')

and it throws up this error:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-24-46468ae45023> in <module>()
----> 1 get_ipython().magic(u"time df_final.to_csv('path/claritin-files-*.csv')")

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in magic(self, arg_s)
   2334         magic_name, _, magic_arg_s = arg_s.partition(' ')
   2335         magic_name = magic_name.lstrip(prefilter.ESC_MAGIC)
-> 2336         return self.run_line_magic(magic_name, magic_arg_s)
   2337 
   2338     #-------------------------------------------------------------------------

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_line_magic(self, magic_name, line)
   2255                 kwargs['local_ns'] = sys._getframe(stack_depth).f_locals
   2256             with self.builtin_trap:
-> 2257                 result = fn(*args,**kwargs)
   2258             return result
   2259 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns)

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magic.pyc in <lambda>(f, *a, **k)
    191     **# but it's overkill for just that one bit of state.**
    192     def magic_deco(arg):
--> 193         call = lambda f, *a, **k: f(*a, **k)
    194 
    195         if callable(arg):

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns)
   1161         if mode=='eval':
   1162             st = clock2()
-> 1163             out = eval(code, glob, local_ns)
   1164             end = clock2()
   1165         else:

<timed eval> in <module>()

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/core.pyc in to_csv(self, filename, **kwargs)
    936         """ See dd.to_csv docstring for more information """
    937         from .io import to_csv
--> 938         return to_csv(self, filename, **kwargs)
    939 
    940     def to_delayed(self):

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.pyc in to_csv(df, filename, name_function, compression, compute, get, **kwargs)
    411     if compute:
    412         from dask import compute
--> 413         compute(*values, get=get)
    414     else:
    415         return values

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
    177         dsk = merge(var.dask for var in variables)
    178     keys = [var._keys() for var in variables]
--> 179     results = get(dsk, keys, **kwargs)
    180 
    181     results_iter = iter(results)

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/threaded.pyc in get(dsk, result, cache, num_workers, **kwargs)
     74     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     75                         cache=cache, get_id=_thread_get_id,
---> 76                         **kwargs)
     77 
     78     # Cleanup pools associated to dead threads

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.pyc in get_async(apply_async, num_workers, dsk, result, cache, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, dumps, loads, **kwargs)
    491                     _execute_task(task, data)  # Re-execute locally
    492                 else:
--> 493                     raise(remote_exception(res, tb))
    494             state['cache'][key] = res
    495             finish_task(dsk, key, state, results, keyorder.get)

**ValueError: invalid literal for long() with base 10: 'total_qty'**

Traceback
---------
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.py", line 268, in execute_task
    result = _execute_task(task, data)
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.py", line 249, in _execute_task
    return func(*args2)
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 55, in pandas_read_text
    coerce_dtypes(df, dtypes)
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 83, in coerce_dtypes
    df[c] = df[c].astype(dtypes[c])
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/generic.py", line 3054, in astype
    raise_on_error=raise_on_error, **kwargs)
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3189, in astype
    return self.apply('astype', dtype=dtype, **kwargs)
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3056, in apply
    applied = getattr(b, f)(**kwargs)
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 461, in astype
    values=values, **kwargs)
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 504, in _astype
    values = _astype_nansafe(values.ravel(), dtype, copy=True)
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/types/cast.py", line 534, in _astype_nansafe
    return lib.astype_intsafe(arr.ravel(), dtype).reshape(arr.shape)
  File "pandas/lib.pyx", line 980, in pandas.lib.astype_intsafe (pandas/lib.c:17409)
  File "pandas/src/util.pxd", line 93, in util.set_value_at_unsafe (pandas/lib.c:72777)

I have a couple of questions:

1) First of all this export was working fine on Friday, it spit out 100 csv files ( since it has 100 partitions), which I later aggregated. So what is wrong today -- anything from the error log?

2) May be this question is for the creators of this package, what is the most time-efficient way to get a csv extract out of a dask dataframe of this size, since it was taking about 1.5 to 2 hrs, the last time it was working.

I'm not using dask distributed and this is on single core of a linux cluster.

Upvotes: 1

Views: 2737

Answers (1)

MRocklin
MRocklin

Reputation: 57251

This error likely has little to do with to_csv and more to do with something else in your computation. The call to df.to_csv was just the first time you forced the computation to roll through all of the data.

Given the error I actually suspect that this is failing in read_csv. Dask.dataframe read the first few hundred kilobytes of your first file to guess at the datatypes, but it seems to have guessed incorrectly. You might want to try specifying dtypes explicitly in the read_csv call.

In regards to the second question about writing to CSV quickly, my first answer would be "use Parquet or HDF5 instead". They're much faster and more accurate in almost every respect.

Upvotes: 1

Related Questions