DarkDrassher34
DarkDrassher34

Reputation: 59

Make groupby.apply more efficient or convert to spark

All,

I am using pandas groupby.apply to use my own custom function. However, I have noticed that the function is very, very slow. Can someone help me in converting this code to apply to spark dataframes?

Adding simple example for people to use:

import pandas as pd
import operator

df = pd.DataFrame({
    'Instruments': ['A', 'B', 'A', 'B', 'A', 'C', 'C', 'B'],
    'Sers': ['Wind', 'Tool', 'Wind', 'Wind', 'Tool', 'Tool', 'Tool', 'Wind'],
    'Sounds': [42, 21, 34, 56, 43, 61, 24, 23]
})
def get_stats(data_frame):

    # For each grouped data_frame, cutoff all Sounds greater than 99th percentile
    cutoff_99 = data_frame[data_frame.Sounds <= data_frame.Sounds.quantile(.99)]

    # Based on total number of records, select the most-abundant sers
    sers_to_use = max((cutoff_99.Sers.value_counts() / cutoff_99.shape[0]).to_dict().items(), key = operator.itemgetter(1))[0]

    # Give me the average sound of the selected sers
    avg_sounds_of_sers_to_use = cutoff_99.loc[cutoff_99["Sers"] == sers_to_use].Sounds.mean()

    # Pre-allocate lists
    cool = []
    mean_sounds = []
    ratios = []
    _difference = []


    for i in cutoff_99.Sers.unique():
        # add each unique sers of that dataframe 
        cool.append(i) 

        # get the mean sound of that ser
        sers_mean_sounds = (cutoff_99.loc[cutoff_99["Sers"] == i].Sounds).mean()

        # add each mean sound for each sers
        mean_sounds.append(sers_mean_sounds) 

        # get the ratio of the sers to use vs. the current sers; add all of the ratios to the list
        ratios.append(avg_sounds_of_sers_to_use / sers_mean_sounds)

        # get the percent difference and add it to a list
        _difference.append(
            float(
                round(
                    abs(avg_sounds_of_sers_to_use - sers_mean_sounds)
                    / ((avg_sounds_of_sers_to_use + sers_mean_sounds) / 2),
                    2,
                )
                * 100
            )
        )

    # return a series with these lists/values.
    return pd.Series({
        'Cools': cool,
        'Chosen_Sers': sers_to_use,
        'Average_Sounds_99_Percent': mean_sounds,
        'Mean_Ratios': ratios,
        'Percent_Differences': _difference
    }) 

I call the function as follows in pandas: df.groupby('Instruments').apply(get_stats)

Upvotes: 3

Views: 298

Answers (2)

cronoik
cronoik

Reputation: 19320

You can achieve everything with pyspark and window functions as shown below.

Creating a pyspark dataframe:

import pandas as pd

data = {
    'Instruments': ['A', 'B', 'A', 'B', 'A', 'C', 'C', 'B'],
    'Sers': ['Wind', 'Tool', 'Wind', 'Wind', 'Tool', 'Tool', 'Tool', 'Wind'],
    'Sounds': [42, 21, 34, 56, 43, 61, 24, 23]
}

pddf = pd.DataFrame(data)

df = spark.createDataFrame(pddf)
df.show()

Output:

+-----------+----+------+
|Instruments|Sers|Sounds|
+-----------+----+------+
|          A|Wind|    42|
|          B|Tool|    21|
|          A|Wind|    34|
|          B|Wind|    56|
|          A|Tool|    43|
|          C|Tool|    61|
|          C|Tool|    24|
|          B|Wind|    23|
+-----------+----+------+

The calculations:

from pyspark.sql import Window
from pyspark.sql import functions as F

wI = Window.partitionBy('Instruments')
wIS = Window.partitionBy('Instruments', 'Sers')

df = df.withColumn('q', F.expr('percentile_approx(Sounds, 0.99)').over(wI))
df = df.filter(df.Sounds < df.q)

#this is our marker for Chosen_Sers
#a higher number indicates that this is the most-abundant sers
df = df.withColumn('tmpCount', F.count('Sers').over(wIS))
#this is the most-abundant sers as string
df = df.withColumn('Chosen_Sers', F.first('Sers').over(wI.orderBy(F.desc('tmpCount'))))

#mean sound for each sers within a instrument 
df = df.withColumn('Average_Sounds_99_Percent', F.mean('Sounds').over(wIS))
#mean sound of the chosen sers
df = df.withColumn('avg_sounds_of_sers_to_use', F.first(F.col('Average_Sounds_99_Percent')).over(wI.orderBy(F.desc('tmpCount'))))

df = df.withColumn('mean_ratios', F.col('avg_sounds_of_sers_to_use')/F.mean('Sounds').over(wIS))

df = df.withColumn('percent_differences', 100 * F.round(F.abs(F.col('avg_sounds_of_sers_to_use') - F.col('Average_Sounds_99_Percent'))/ ((F.col('avg_sounds_of_sers_to_use') + F.col('Average_Sounds_99_Percent'))/2),2))

#until now we flat table
df.show()

#now we create the desired structure and drop all unneeded columns
df.dropDuplicates(['Instruments','Sers']).groupby('Instruments', 'Chosen_Sers').agg(F.collect_list('Sers').alias('Sers')
                                            , F.collect_list('Average_Sounds_99_Percent').alias('Average_Sounds_99_Percent')
                                            , F.collect_list('mean_ratios').alias('mean_ratios')
                                            , F.collect_list('percent_differences').alias('percent_differences')
                                            ).show(truncate=False)

Output:

#just a flat table
+-----------+----+------+---+--------+-----------+-------------------------+-------------------------+------------------+-------------------+
|Instruments|Sers|Sounds|  q|tmpCount|Chosen_Sers|Average_Sounds_99_Percent|avg_sounds_of_sers_to_use|       mean_ratios|percent_differences|
+-----------+----+------+---+--------+-----------+-------------------------+-------------------------+------------------+-------------------+
|          B|Tool|    21| 56|       1|       Tool|                     21.0|                     21.0|               1.0|                0.0|
|          B|Wind|    23| 56|       1|       Tool|                     23.0|                     21.0|1.0952380952380953|                9.0|
|          C|Tool|    24| 61|       1|       Tool|                     24.0|                     24.0|               1.0|                0.0|
|          A|Wind|    42| 43|       2|       Wind|                     38.0|                     38.0|               1.0|                0.0|
|          A|Wind|    34| 43|       2|       Wind|                     38.0|                     38.0|               1.0|                0.0|
+-----------+----+------+---+--------+-----------+-------------------------+-------------------------+------------------+-------------------+
#desired structure
+-----------+-----------+------------+-------------------------+-------------------------+-------------------+
|Instruments|Chosen_Sers|Sers        |Average_Sounds_99_Percent|mean_ratios              |percent_differences|
+-----------+-----------+------------+-------------------------+-------------------------+-------------------+
|B          |Tool       |[Tool, Wind]|[21.0, 23.0]             |[1.0, 0.9130434782608695]|[0.0, 9.0]         |
|C          |Tool       |[Tool]      |[24.0]                   |[1.0]                    |[0.0]              |
|A          |Wind       |[Wind]      |[38.0]                   |[1.0]                    |[0.0]              |
+-----------+-----------+------------+-------------------------+-------------------------+-------------------+

Upvotes: 2

mcskinner
mcskinner

Reputation: 2748

You can do everything you want within pandas.

Here is one example of how to reorganize your working code with the groupby functionality.

First, I made a small change to your code so that get_stats returns a DataFrame instead of a Series. I also changed the calling code to look like this:

df.groupby('Instruments').apply(get_stats).reset_index().drop(columns='level_1')
#   Instruments Cools Chosen_Sers  Average_Sounds_99_Percent  Mean_Ratios  Percent_Differences
# 0           A  Wind        Wind                       38.0     1.000000                  0.0
# 1           B  Tool        Wind                       21.0     1.095238                  9.0
# 2           B  Wind        Wind                       23.0     1.000000                  0.0
# 3           C  Tool        Tool                       24.0     1.000000                  0.0

Then it is possible to rewrite your code to process all Instruments at once, and all Sers at once within that.

# Compute 99th percentile Sounds by Instruments, filter the data.
quantile99 = df.groupby('Instruments')['Sounds'].quantile(.99)
cutoff_99 = df.merge(quantile99, on='Instruments', suffixes=['', '99'])
cutoff_99 = cutoff_99[cutoff_99['Sounds'] <= cutoff_99['Sounds99']].drop(columns='Sounds99')

# Find the most common Sers for each Instrument.
ser_counts = cutoff_99.groupby('Instruments').Sers.value_counts().sort_values(ascending=False)
sers_to_use = ser_counts.reset_index(name='n').drop_duplicates('Instruments').drop(columns='n')
filtered_sers = cutoff_99.merge(sers_to_use, on=['Instruments', 'Sers'])

# Compute the average Sounds for the most common Sers, and each Sers.
avg_sounds = filtered_sers.groupby('Instruments').mean()
sers_mean_sounds = cutoff_99.groupby(['Instruments', 'Sers']).mean()

# Compute derived values. This works because of the shared Instruments index.
difference = abs(avg_sounds - sers_mean_sounds) / ((avg_sounds + sers_mean_sounds) / 2)
ratios = avg_sounds / sers_mean_sounds

# Rename everything, merge the data together into your desired format.
chosen_sers_df = sers_to_use.rename(columns={'Sers': 'Chosen_Sers'})
sers_mean_df = sers_mean_sounds.reset_index().rename(columns={'Sounds': 'Average_Sounds_99_Percent'})
ratios_df = ratios.reset_index().rename(columns={'Sounds': 'Mean_Ratios'})
difference_df = (difference * 100).round().reset_index().rename(columns={'Sounds': 'Percent_Differences'})
result = chosen_sers_df.merge(sers_mean_df).merge(ratios_df).merge(difference_df).rename(columns={'Sers': 'Cools'})

It is likely that the code above could be cleaned up and streamlined.

But it produces the desired result, with a small amount of reordering due to the implementation changing.

result
#   Instruments Chosen_Sers Cools  Average_Sounds_99_Percent  Mean_Ratios  Percent_Differences
# 0           A        Wind  Wind                         38     1.000000                  0.0
# 1           C        Tool  Tool                         24     1.000000                  0.0
# 2           B        Wind  Tool                         21     1.095238                  9.0
# 3           B        Wind  Wind                         23     1.000000                  0.0

Upvotes: 1

Related Questions