Mrityunjay
Mrityunjay

Reputation: 2261

Read with dask.dataframe when file not accessible from local machine

I have one powerful machine(remote machine), accessible through SSH. My data is stored at remote machine.

I want to run & access data on the remote machine. For this, I ran a dask-scheduler & a dask-worker on the remote machine. Then I ran a jupyter notebook on my laptop (local machine) with client=Client(‘schedular-ip:8786’), but it still refer data on the local machine, not of the remote machine.

How do I refer to data of the remote machine from notebook, running on the local machine?

import dask.dataframe as dd
from dask.distributed import Client

client = Client('remote-ip:8786')

ddf = dd.read_csv(
    'remote-machine-file.csv',
    header=None,
    assume_missing=True,
    dtype=object,
)

It fails with

---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-37-17d26dadb3a8> in <module>
----> 1 ddf = dd.read_csv('remote-machine-file.csv', header=None, assume_missing=True, dtype=object)

/usr/local/conda/lib/python3.7/site-packages/dask/dataframe/io/csv.py in read(urlpath, blocksize, lineterminator, compression, sample, sample_rows, enforce, assume_missing, storage_options, include_path_column, **kwargs)
    735             storage_options=storage_options,
    736             include_path_column=include_path_column,
--> 737             **kwargs,
    738         )
    739 

/usr/local/conda/lib/python3.7/site-packages/dask/dataframe/io/csv.py in read_pandas(reader, urlpath, blocksize, lineterminator, compression, sample, sample_rows, enforce, assume_missing, storage_options, include_path_column, **kwargs)
    520 
    521         # Infer compression from first path
--> 522         compression = infer_compression(paths[0])
    523 
    524     if blocksize == "default":

IndexError: list index out of range

Upvotes: 2

Views: 357

Answers (1)

Michael Delgado
Michael Delgado

Reputation: 15432

When using dask.dataframe with a distributed.Client, while the majority of the I/O is done by remote workers, dask does rely on the client machine being able to access the data for scheduling.

To run anything purely on the worker, you can always have the worker schedule the operation, e.g. with:

client = Client()

# use the client to have the worker run the dask.dataframe command!
f = client.submit(dd.read_csv, fp)

# because the worker is holding a dask dataframe object, requesting
# the result brings the dask.dataframe object/metadata to the
# local client, while leaving the data on the remote machine
df = f.result()

Alternatively, you can partition the job manually yourself, e.g. if you have many files, then read them into memory on the workers, and finally construct the dask dataframe locally with dask.dataframe.from_delayed:

import pandas as pd

files_on_remote = ['data/file_{}.csv'.format(i) for i in range(100)]

# have the workers read the data with pandas
futures = client.map(pd.read_csv, files_on_remote)

# use dask.dataframe.from_delayed to construct a dask.dataframe from the
# remote pandas objects
df = ddf.from_delayed(futures)

Upvotes: 2

Related Questions