Reputation: 13437
I'm confused about how to get the best from dask.
The problem
I have a dataframe which contains several timeseries (every one has its own key
) and I need to run a function my_fun
on every each of them. One way to solve it with pandas involves
df = list(df.groupby("key"))
and then apply my_fun
with multiprocessing. The performances, despite the huge usage of RAM, are pretty good on my machine and terrible on google cloud compute.
On Dask my current workflow is:
import dask.dataframe as dd
from dask.multiprocessing import get
As I didn't set the indices df.known_divisions
is False
The resulting graph is
and I don't understand if what I see it is a bottleneck or not.
Questions:
df.npartitions
as a multiple of ncpu
or it doesn't matter?From this it seems that is better to set the index as key. My guess is that I can do something like
df["key2"] = df["key"] df = df.set_index("key2")
but, again, I don't know if this is the best way to do it.
Upvotes: 4
Views: 323
Reputation: 28683
For questions like "what is taking time" in Dask, you are generally recommended to use the "distributed" scheduler rather than multiprocessing - you can run with any number of processes/threads you like, but you have much more information available via the diagnostics dashboard.
For your specific questions, if you are grouping over a column that is not nicely split between partitions and applying anything other than the simple aggregations, you will inevitably need a shuffle. Setting the index does this shuffle for you as a explicit step, or you get the implicit shuffle apparent in your task graph. This is a many-to-many operation, each aggregation tasks needs input from every original partition, hence the bottle-neck. There is no getting around that.
As for number of partitions, yes you can have sub-optimal conditions like 9 partitions on 8 cores (you will calculate 8 tasks, and then perhaps block for the final task on one core while the others are idle); but in general you can depend on dask to make reasonable scheduling decisions so long as you are not using a very small number of partitions. In many cases, it will not matter much.
Upvotes: 5