Reputation: 1105
Working from Matthew Rocklin's post on distributed data frames with Dask, I'm trying to distribute some summary statistics calculations across my cluster. Setting up the cluster with dcluster ...
works fine. Inside a notebook,
import dask.dataframe as dd
from distributed import Executor, progress
e = Executor('...:8786')
df = dd.read_csv(...)
The file I'm reading is on an NFS mount that all the worker machines have access to. At this point I can look at df.head()
for example and everything looks correct. From the blog post, I think I should be able to do this:
df_future = e.persist(df)
progress(df_future)
# ... wait for everything to load ...
df_future.head()
But that's an error:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-26-8d59adace8bf> in <module>()
----> 1 fraudf.head()
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/dataframe/core.py in head(self, n, compute)
358
359 if compute:
--> 360 result = result.compute()
361 return result
362
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
35
36 def compute(self, **kwargs):
---> 37 return compute(self, **kwargs)[0]
38
39 @classmethod
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
108 for opt, val in groups.items()])
109 keys = [var._keys() for var in variables]
--> 110 results = get(dsk, keys, **kwargs)
111
112 results_iter = iter(results)
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
55 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
56 cache=cache, queue=queue, get_id=_thread_get_id,
---> 57 **kwargs)
58
59 return results
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
479 _execute_task(task, data) # Re-execute locally
480 else:
--> 481 raise(remote_exception(res, tb))
482 state['cache'][key] = res
483 finish_task(dsk, key, state, results, keyorder.get)
AttributeError: 'Future' object has no attribute 'head'
Traceback
---------
File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py", line 264, in execute_task
result = _execute_task(task, data)
File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py", line 246, in _execute_task
return func(*args2)
File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/dataframe/core.py", line 354, in <lambda>
dsk = {(name, 0): (lambda x, n: x.head(n=n), (self._name, 0), n)}
What's the right approach to distributing a data frame when it comes from a normal file system instead of HDFS?
Upvotes: 2
Views: 458
Reputation: 57301
Dask is trying to use the single-machine scheduler, which is the default if you create a dataframe using the normal dask library. Switch the default to use your cluster with the following lines:
import dask
dask.set_options(get=e.get)
Upvotes: 1