Clock Slave
Clock Slave

Reputation: 7957

Caching Spark Dataframe for speed enhancement

I have a function that joins a list of dataframes to a base dataframe and returns a dataframe. I am trying to reduce the time this operation takes. Since I was joining multiple times using the base dataframe, I cached it but the runtime is still similar. This is the function I am using it

def merge_dataframes(base_df, df_list, id_col):
    """
    Joins multiple dataframes using an identifier variable common across datasets
    :param base_df: everything will be added to this dataframe
    :param df_list: dfs that have to be joined to main dataset
    :param id_col: the identifier column
    :return: dataset with all joins
    """
    base_df.persist(StorageLevel.MEMORY_AND_DISK)
    for each_df in df_list:
        base_df = base_df.join(each_df, id_col)
    base_df.unpersist()
    return base_df

I was surprised to get similar results after caching. Whats the reason behind this and what can I do to make this consume less time.

Also since the datasets I am using currently are relatively small (~50k records) so I don't have an issue with caching datasets as and when needed as long as I decache them.

Upvotes: 0

Views: 447

Answers (1)

Vladislav Varslavans
Vladislav Varslavans

Reputation: 2934

Join is a transformation - no calculation is triggered at this point

First:

You unpersist() it before the action.

Try remove unpersist and see what happens.

Second:

I'm afraid that in your case you can't benifit from persistance, because thing that is written in your code is same as:

base_df.join(df1, id_col).join(df2, id_col).join(df3, id_col)...

In that case base_df is calculated only once and later only result of base_df.join() is used further. That means base_df is not reused.

Here is example where it would be reused:

base_df.join(df1, id_col)
base_df.join(df2, id_col)

But that does not fit your requirements. Depending on how base_df and list_dfs - how they are created you might want to consider pre-partition these dataframes with same partitioner - in that case join operation will not cause shuffle, that will greatly improve performance.

Another way is to perform broadcast join if dataframes in list_dfs are relatively small.

Upvotes: 2

Related Questions