Reputation: 2261
I have one powerful machine(remote machine), accessible through SSH. My data is stored at remote machine.
I want to run & access data on the remote machine. For this, I ran a dask-scheduler & a dask-worker on the remote machine. Then I ran a jupyter notebook on my laptop (local machine) with client=Client(‘schedular-ip:8786’), but it still refer data on the local machine, not of the remote machine.
How do I refer to data of the remote machine from notebook, running on the local machine?
import dask.dataframe as dd
from dask.distributed import Client
client = Client('remote-ip:8786')
ddf = dd.read_csv(
'remote-machine-file.csv',
header=None,
assume_missing=True,
dtype=object,
)
It fails with
---------------------------------------------------------------------------
IndexError Traceback (most recent call last)
<ipython-input-37-17d26dadb3a8> in <module>
----> 1 ddf = dd.read_csv('remote-machine-file.csv', header=None, assume_missing=True, dtype=object)
/usr/local/conda/lib/python3.7/site-packages/dask/dataframe/io/csv.py in read(urlpath, blocksize, lineterminator, compression, sample, sample_rows, enforce, assume_missing, storage_options, include_path_column, **kwargs)
735 storage_options=storage_options,
736 include_path_column=include_path_column,
--> 737 **kwargs,
738 )
739
/usr/local/conda/lib/python3.7/site-packages/dask/dataframe/io/csv.py in read_pandas(reader, urlpath, blocksize, lineterminator, compression, sample, sample_rows, enforce, assume_missing, storage_options, include_path_column, **kwargs)
520
521 # Infer compression from first path
--> 522 compression = infer_compression(paths[0])
523
524 if blocksize == "default":
IndexError: list index out of range
Upvotes: 2
Views: 357
Reputation: 15432
When using dask.dataframe
with a distributed.Client
, while the majority of the I/O is done by remote workers, dask does rely on the client machine being able to access the data for scheduling.
To run anything purely on the worker, you can always have the worker schedule the operation, e.g. with:
client = Client()
# use the client to have the worker run the dask.dataframe command!
f = client.submit(dd.read_csv, fp)
# because the worker is holding a dask dataframe object, requesting
# the result brings the dask.dataframe object/metadata to the
# local client, while leaving the data on the remote machine
df = f.result()
Alternatively, you can partition the job manually yourself,
e.g. if you have many files, then read them into memory on
the workers, and finally construct the dask dataframe locally with dask.dataframe.from_delayed
:
import pandas as pd
files_on_remote = ['data/file_{}.csv'.format(i) for i in range(100)]
# have the workers read the data with pandas
futures = client.map(pd.read_csv, files_on_remote)
# use dask.dataframe.from_delayed to construct a dask.dataframe from the
# remote pandas objects
df = ddf.from_delayed(futures)
Upvotes: 2