DGrady
DGrady

Reputation: 1105

How do you use dask + distributed for NFS files?

Working from Matthew Rocklin's post on distributed data frames with Dask, I'm trying to distribute some summary statistics calculations across my cluster. Setting up the cluster with dcluster ... works fine. Inside a notebook,

import dask.dataframe as dd
from distributed import Executor, progress
e = Executor('...:8786')

df = dd.read_csv(...)

The file I'm reading is on an NFS mount that all the worker machines have access to. At this point I can look at df.head() for example and everything looks correct. From the blog post, I think I should be able to do this:

df_future = e.persist(df)
progress(df_future)
# ... wait for everything to load ...
df_future.head()

But that's an error:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-26-8d59adace8bf> in <module>()
----> 1 fraudf.head()

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/dataframe/core.py in head(self, n, compute)
    358 
    359         if compute:
--> 360             result = result.compute()
    361         return result
    362 

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
     35 
     36     def compute(self, **kwargs):
---> 37         return compute(self, **kwargs)[0]
     38 
     39     @classmethod

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
    108                 for opt, val in groups.items()])
    109     keys = [var._keys() for var in variables]
--> 110     results = get(dsk, keys, **kwargs)
    111 
    112     results_iter = iter(results)

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
     55     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     56                         cache=cache, queue=queue, get_id=_thread_get_id,
---> 57                         **kwargs)
     58 
     59     return results

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
    479                 _execute_task(task, data)  # Re-execute locally
    480             else:
--> 481                 raise(remote_exception(res, tb))
    482         state['cache'][key] = res
    483         finish_task(dsk, key, state, results, keyorder.get)

AttributeError: 'Future' object has no attribute 'head'

Traceback
---------
  File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py", line 264, in execute_task
    result = _execute_task(task, data)
  File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py", line 246, in _execute_task
    return func(*args2)
  File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/dataframe/core.py", line 354, in <lambda>
    dsk = {(name, 0): (lambda x, n: x.head(n=n), (self._name, 0), n)}

What's the right approach to distributing a data frame when it comes from a normal file system instead of HDFS?

Upvotes: 2

Views: 458

Answers (1)

MRocklin
MRocklin

Reputation: 57301

Dask is trying to use the single-machine scheduler, which is the default if you create a dataframe using the normal dask library. Switch the default to use your cluster with the following lines:

import dask
dask.set_options(get=e.get)

Upvotes: 1

Related Questions