JuanPabloMF
JuanPabloMF

Reputation: 497

Slow len function on dask distributed dataframe

I have been testing how to use dask (cluster with 20 cores) and I am surprised by the speed that I get on calling a len function vs slicing through loc.

import dask.dataframe as dd
from dask.distributed import Client
client = Client('192.168.1.220:8786')

log = pd.read_csv('800000test', sep='\t')
logd = dd.from_pandas(log,npartitions=20)

#This is the code than runs slowly 
#(2.9 seconds whilst I would expect no more than a few hundred millisencods)

print(len(logd))

#Instead this code is actually running almost 20 times faster than pandas
logd.loc[:'Host'].count().compute()

Any ideas why this could be happening? It isn't important for me that len runs fast, but I feel that by not understanding this behaviour there is something I am not grasping about the library.

enter image description here

All of the green boxes correspond to "from_pandas" whilst in this article by Matthew Rocklin http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes the call graph looks better (len_chunk is called which is significantly faster and the calls don't seem to be locked by and wait for one worker to finish his task before starting the other)

enter image description here

Upvotes: 16

Views: 5555

Answers (1)

MRocklin
MRocklin

Reputation: 57251

Good question, this gets at a few points about when data is moving up to the cluster and back down to the client (your python session). Lets look at a few stages of your compuation

Load data with Pandas

This is a Pandas dataframe in your python session, so it's obviously still in your local process.

log = pd.read_csv('800000test', sep='\t')  # on client

Convert to a lazy Dask.dataframe

This breaks up your Pandas dataframe into twenty Pandas dataframes, however these are still on the client. Dask dataframes don't eagerly send data up to the cluster.

logd = dd.from_pandas(log,npartitions=20)  # still on client

Compute len

Calling len actually causes computation here (normally you would use df.some_aggregation().compute(). So now Dask kicks in. First it moves your data out to the cluster (slow) then it calls len on all of the 20 partitions (fast), it aggregates those (fast) and then moves the result down to your client so that it can print.

print(len(logd))  # costly roundtrip client -> cluster -> client

Analysis

So the problem here is that our dask.dataframe still had all of its data in the local python session.

It would have been much faster to use, say, the local threaded scheduler rather than the distributed scheduler. This should compute in milliseconds

with dask.set_options(get=dask.threaded.get):  # no cluster, just local threads
    print(len(logd))  # stays on client

But presumably you want to know how to scale out to larger datasets, so lets do this the right way.

Load your data on the workers

Instead of loading with Pandas on your client/local session, let the Dask workers load bits of the csv file. This way no client-worker communication is necessary.

# log = pd.read_csv('800000test', sep='\t')  # on client
log = dd.read_csv('800000test', sep='\t')    # on cluster workers

However, unlike pd.read_csv, dd.read_csv is lazy, so this should return almost immediately. We can force Dask to actually do the computation with the persist method

log = client.persist(log)  # triggers computation asynchronously

Now the cluster kicks into action and loads your data directly in the workers. This is relatively fast. Note that this method returns immediately while work happens in the background. If you want to wait until it finishes, call wait.

from dask.distributed import wait
wait(log)  # blocks until read is done

If you're testing with a small dataset and want to get more partitions, try changing the blocksize.

log = dd.read_csv(..., blocksize=1000000)  # 1 MB blocks

Regardless, operations on log should now be fast

len(log)  # fast

Edit

In response to a question on this blogpost here are the assumptions that we're making about where the file lives.

Generally when you provide a filename to dd.read_csv it assumes that that file is visible from all of the workers. This is true if you are using a network file system, or a global store like S3 or HDFS. If you are using a network file system then you will want to either use absolute paths (like /path/to/myfile.*.csv) or else ensure that your workers and client have the same working directory.

If this is not the case, and your data is only on your client machine, then you will have to load and scatter it out.

Simple but sub-optimal

The simple way is just to do what you did originally, but persist your dask.dataframe

log = pd.read_csv('800000test', sep='\t')  # on client
logd = dd.from_pandas(log,npartitions=20)  # still on client
logd = client.persist(logd)  # moves to workers

This is fine, but results in slightly less-than-ideal communication.

Complex but optimal

Instead, you might scatter your data out to the cluster explicitly

[future] = client.scatter([log])

This gets into more complex API though, so I'll just point you to docs

http://distributed.readthedocs.io/en/latest/manage-computation.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/delayed-collections.html

Upvotes: 15

Related Questions