jhanv
jhanv

Reputation: 79

Why does dask take long time to compute regardless of the size of dataframe

What is the reason that dask dataframe takes long time to compute regardless of the size of dataframe. How to avoid this from happening ? What is the reason behind it?

EDIT:

I'm currently working on AWS Sagemaker with ml.c5.2xlarge instance type and the data is in S3 bucket. I did not connect to client as I was not able to. I'm getting this error when I ran the client through local cluster --> AttributeError: MaterializedLayer' object has no attribute 'pack_annotations'

So, I proceeded without connecting with anything specific, there by it is now on Default. (Cluster, Workers: 4, Cores: 8,Memory: 16.22 GB )

shape = df.shape
nrows = shape[0].compute()
print("nrows",nrows)
print(df.npartitions) 

I tried to perform compute on 24700000 records (~27M), with 23 partitions and the time taken to execute is CPU times: user 4min 48s, sys: 12.9 s, total: 5min 1s Wall time: 4min 46s

For nrows 5120000 (~5M), with 23 partitions, and the time taken to execute is CPU times: user 4min 50s, sys: 12 s, total: 5min 2s Wall time: 4min 46s

For nrows 7697351 (~7M) with 1 partition, The time taken is CPU times: user 5min 4s, sys: 10.6 s, total: 5min 14s Wall time: 4min 52s

I performed the same operations in Pandas with 7690000 (~7M) and the time take to execute is CPU times: user 502 µs, sys: 0 ns, total: 502 µs Wall time: 402 µs Number of columns remains 5 for all the above cases

I'm just trying to find the shape of the data, But in Dask regardless of the type of operation the dask is taking same time to perform one compute operation.

May I know what is the reason behind this and what do I need to do avoid this and optimize the compute time

Upvotes: 3

Views: 5097

Answers (2)

jhanv
jhanv

Reputation: 79

The reason dask dataframe is taking more time to compute (shape or any operation) is because when a compute op is called, dask tries to perform operations from the creation of the current dataframe or it's ancestors to the point where compute() is called.

In the scenario presented in the question, dask is trying to read data from S3 bucket (reading from s3 bucket takes reasonably long time). So, when compute is called (to find shape or any other operation) dask tries to perform all the operations from reading the csv data file from s3, which adds to the time taken to execute.

compute() should be minimally used, but incases where compute operation in any form has to perform over and over again on the current or it's child dataframes, persisting the dataframe helps. persist() allows the data to stored in distributed memory and thus it doesn't execute all the operations from it's ancestor, but it executes from where the data is persisted.

Upvotes: 2

SultanOrazbayev
SultanOrazbayev

Reputation: 16551

In general, a given computation will have parts that can be distributed (parallelised) and those that have to be done sequentially, see Amdahl's law. If a given algorithm has a large serial component, then the gains from distributing/scaling are going to be small.

Without knowing the specifics of your task graph, it's hard to say what exactly is causing the bottleneck, but more broadly there could be several reasons for slow performance even with relatively small inputs:

  • computations that generate a lot of partitions or involve communication across multiple partitions (e.g. sorting, consistently encoding categorical columns);
  • slow IO (e.g. if the dataframe has to be constructed by polling a remote database using a slow network connection);
  • computations that involve simulations with a fixed cost of construction (e.g. if the dataframe is a list of edges that should be checked against a specific random graph that has to be constructed for multiple seeds).

The reasons listed above will still scale with the data, so this is not exactly an answer to your question ("regardless of the size of the dataframe"), but might help.

And to resolve this problem (or avoid it) one typically has to examine the algorithm/code, identify the performance bottlenecks, figure out if they can be parallelised or if they are inherently serial.

Upvotes: 2

Related Questions