Matěj Račinský
Matěj Račinský

Reputation: 1804

Dask dataframe join slow as pandas

I have 2 dataframes, one called animes ~10k rows of data, one called animelists ~30M rows of data, and want to join them. I benchmarked it with pandas, and it is faster only around 7% speedup, which is not much and I wonder if it can be faster if I have 16 cores.

I have pandas Dataframes, where I set indices

animes = animes.set_index('anime_id')
animelists = animelists.set_index('anime_id')

The data looks like this (I ommited other columns), animes:

anime_id | genres
-------- | ------
11013    | Comedy, Supernatural, Romance, Shounen
2104     | Comedy, Parody, Romance, School, Shounen
5262     | Comedy, Magic, School, Shoujo

and animelists:

anime_id | username | my_score
21       | karthiga | 9
59       | karthiga | 7
74       | karthiga | 7

and then I created Dask Dataframes from this

animes_dd = dd.from_pandas(animes, npartitions=8)
animelists_dd = dd.from_pandas(animelists, npartitions=8)

I want to join effectively individual anime genres with animelists to query scores by genres. I have code to do that here in pandas:

genres_arr = animes['genres'].str.replace(' ', '').str.split(',', expand=True).stack().reset_index(drop=True, level=1).to_frame(name='genre')
genres_arr = genres_arr[genres_arr['genre'] != '']
resulting_df = animelists.merge(genres_arr, how='inner', left_index=True, right_index=True)
# this takes 1min 37s

and same code in dask:

genres_arr_dd = animes_dd['genres'].map_partitions(lambda x: x.str.replace(' ', '').str.split(',', expand=True).stack().reset_index(drop=True, level=1)).to_frame(name='genre')
genres_arr_dd = genres_arr_dd[genres_arr_dd['genre'] != '']
resulting_dd = animelists_dd.merge(genres_arr_dd, how='inner', left_index=True, right_index=True).compute()
# this takes 1min 30s

(the resulting dataframe has ~140M rows)

Is there any way to speed it up more? I followed the official performance guide, I perform joins on indexed columns, and have 8 partitions on each Dask Dataframe, so it should be prepared for effective multiprocessing join.

What is wrong here and how should I speed it up more?

When I ran the code in jupyter notebook, I was watching per-core CPU utilization, and it was very low, and some time, only one core was active, and ran at 100%. It seems that it does not parallelize well.

Upvotes: 1

Views: 2825

Answers (1)

mdurant
mdurant

Reputation: 28673

This has been repeated elsewhere, so I will keep it very brief.

  • from_pandas->compute means that you are roundtripping all the data; you want to load in the workers (e.g., dd.read_csv) and aggregate in the workers, not move whole datasets to and from

  • the choice of scheduler is very important. If your system monitor says you are using one CPU, you are probably limited by the GIL and should try the distributed scheduler, with appropriate process/thread mix. It will also give you more diagnostics on its dashboard about what is going on

  • Pandas is fast, and when the data is small, the additional overhead of dask, although also small, may outweigh any parallelism you get.

Upvotes: 2

Related Questions