rpanai
rpanai

Reputation: 13437

Dask performances: workflow doubts

I'm confused about how to get the best from dask.

The problem I have a dataframe which contains several timeseries (every one has its own key) and I need to run a function my_fun on every each of them. One way to solve it with pandas involves df = list(df.groupby("key")) and then apply my_fun with multiprocessing. The performances, despite the huge usage of RAM, are pretty good on my machine and terrible on google cloud compute.

On Dask my current workflow is:

import dask.dataframe as dd
from dask.multiprocessing import get
  1. Read data from S3. 14 files -> 14 partitions
  2. `df.groupby("key").apply(my_fun).to_frame.compute(get=get)

As I didn't set the indices df.known_divisions is False

The resulting graph is enter image description here and I don't understand if what I see it is a bottleneck or not.

Questions:

  1. Is it better to have df.npartitions as a multiple of ncpu or it doesn't matter?
  2. From this it seems that is better to set the index as key. My guess is that I can do something like

    df["key2"] = df["key"] df = df.set_index("key2")

but, again, I don't know if this is the best way to do it.

Upvotes: 4

Views: 323

Answers (1)

mdurant
mdurant

Reputation: 28683

For questions like "what is taking time" in Dask, you are generally recommended to use the "distributed" scheduler rather than multiprocessing - you can run with any number of processes/threads you like, but you have much more information available via the diagnostics dashboard.

For your specific questions, if you are grouping over a column that is not nicely split between partitions and applying anything other than the simple aggregations, you will inevitably need a shuffle. Setting the index does this shuffle for you as a explicit step, or you get the implicit shuffle apparent in your task graph. This is a many-to-many operation, each aggregation tasks needs input from every original partition, hence the bottle-neck. There is no getting around that.

As for number of partitions, yes you can have sub-optimal conditions like 9 partitions on 8 cores (you will calculate 8 tasks, and then perhaps block for the final task on one core while the others are idle); but in general you can depend on dask to make reasonable scheduling decisions so long as you are not using a very small number of partitions. In many cases, it will not matter much.

Upvotes: 5

Related Questions