Reputation: 59
All,
I am using pandas
groupby.apply
to use my own custom function. However, I have noticed that the function is very, very slow. Can someone help me in converting this code to apply to spark dataframes?
Adding simple example for people to use:
import pandas as pd
import operator
df = pd.DataFrame({
'Instruments': ['A', 'B', 'A', 'B', 'A', 'C', 'C', 'B'],
'Sers': ['Wind', 'Tool', 'Wind', 'Wind', 'Tool', 'Tool', 'Tool', 'Wind'],
'Sounds': [42, 21, 34, 56, 43, 61, 24, 23]
})
def get_stats(data_frame):
# For each grouped data_frame, cutoff all Sounds greater than 99th percentile
cutoff_99 = data_frame[data_frame.Sounds <= data_frame.Sounds.quantile(.99)]
# Based on total number of records, select the most-abundant sers
sers_to_use = max((cutoff_99.Sers.value_counts() / cutoff_99.shape[0]).to_dict().items(), key = operator.itemgetter(1))[0]
# Give me the average sound of the selected sers
avg_sounds_of_sers_to_use = cutoff_99.loc[cutoff_99["Sers"] == sers_to_use].Sounds.mean()
# Pre-allocate lists
cool = []
mean_sounds = []
ratios = []
_difference = []
for i in cutoff_99.Sers.unique():
# add each unique sers of that dataframe
cool.append(i)
# get the mean sound of that ser
sers_mean_sounds = (cutoff_99.loc[cutoff_99["Sers"] == i].Sounds).mean()
# add each mean sound for each sers
mean_sounds.append(sers_mean_sounds)
# get the ratio of the sers to use vs. the current sers; add all of the ratios to the list
ratios.append(avg_sounds_of_sers_to_use / sers_mean_sounds)
# get the percent difference and add it to a list
_difference.append(
float(
round(
abs(avg_sounds_of_sers_to_use - sers_mean_sounds)
/ ((avg_sounds_of_sers_to_use + sers_mean_sounds) / 2),
2,
)
* 100
)
)
# return a series with these lists/values.
return pd.Series({
'Cools': cool,
'Chosen_Sers': sers_to_use,
'Average_Sounds_99_Percent': mean_sounds,
'Mean_Ratios': ratios,
'Percent_Differences': _difference
})
I call the function as follows in pandas
:
df.groupby('Instruments').apply(get_stats)
Upvotes: 3
Views: 298
Reputation: 19320
You can achieve everything with pyspark and window functions as shown below.
Creating a pyspark dataframe:
import pandas as pd
data = {
'Instruments': ['A', 'B', 'A', 'B', 'A', 'C', 'C', 'B'],
'Sers': ['Wind', 'Tool', 'Wind', 'Wind', 'Tool', 'Tool', 'Tool', 'Wind'],
'Sounds': [42, 21, 34, 56, 43, 61, 24, 23]
}
pddf = pd.DataFrame(data)
df = spark.createDataFrame(pddf)
df.show()
Output:
+-----------+----+------+
|Instruments|Sers|Sounds|
+-----------+----+------+
| A|Wind| 42|
| B|Tool| 21|
| A|Wind| 34|
| B|Wind| 56|
| A|Tool| 43|
| C|Tool| 61|
| C|Tool| 24|
| B|Wind| 23|
+-----------+----+------+
The calculations:
from pyspark.sql import Window
from pyspark.sql import functions as F
wI = Window.partitionBy('Instruments')
wIS = Window.partitionBy('Instruments', 'Sers')
df = df.withColumn('q', F.expr('percentile_approx(Sounds, 0.99)').over(wI))
df = df.filter(df.Sounds < df.q)
#this is our marker for Chosen_Sers
#a higher number indicates that this is the most-abundant sers
df = df.withColumn('tmpCount', F.count('Sers').over(wIS))
#this is the most-abundant sers as string
df = df.withColumn('Chosen_Sers', F.first('Sers').over(wI.orderBy(F.desc('tmpCount'))))
#mean sound for each sers within a instrument
df = df.withColumn('Average_Sounds_99_Percent', F.mean('Sounds').over(wIS))
#mean sound of the chosen sers
df = df.withColumn('avg_sounds_of_sers_to_use', F.first(F.col('Average_Sounds_99_Percent')).over(wI.orderBy(F.desc('tmpCount'))))
df = df.withColumn('mean_ratios', F.col('avg_sounds_of_sers_to_use')/F.mean('Sounds').over(wIS))
df = df.withColumn('percent_differences', 100 * F.round(F.abs(F.col('avg_sounds_of_sers_to_use') - F.col('Average_Sounds_99_Percent'))/ ((F.col('avg_sounds_of_sers_to_use') + F.col('Average_Sounds_99_Percent'))/2),2))
#until now we flat table
df.show()
#now we create the desired structure and drop all unneeded columns
df.dropDuplicates(['Instruments','Sers']).groupby('Instruments', 'Chosen_Sers').agg(F.collect_list('Sers').alias('Sers')
, F.collect_list('Average_Sounds_99_Percent').alias('Average_Sounds_99_Percent')
, F.collect_list('mean_ratios').alias('mean_ratios')
, F.collect_list('percent_differences').alias('percent_differences')
).show(truncate=False)
Output:
#just a flat table
+-----------+----+------+---+--------+-----------+-------------------------+-------------------------+------------------+-------------------+
|Instruments|Sers|Sounds| q|tmpCount|Chosen_Sers|Average_Sounds_99_Percent|avg_sounds_of_sers_to_use| mean_ratios|percent_differences|
+-----------+----+------+---+--------+-----------+-------------------------+-------------------------+------------------+-------------------+
| B|Tool| 21| 56| 1| Tool| 21.0| 21.0| 1.0| 0.0|
| B|Wind| 23| 56| 1| Tool| 23.0| 21.0|1.0952380952380953| 9.0|
| C|Tool| 24| 61| 1| Tool| 24.0| 24.0| 1.0| 0.0|
| A|Wind| 42| 43| 2| Wind| 38.0| 38.0| 1.0| 0.0|
| A|Wind| 34| 43| 2| Wind| 38.0| 38.0| 1.0| 0.0|
+-----------+----+------+---+--------+-----------+-------------------------+-------------------------+------------------+-------------------+
#desired structure
+-----------+-----------+------------+-------------------------+-------------------------+-------------------+
|Instruments|Chosen_Sers|Sers |Average_Sounds_99_Percent|mean_ratios |percent_differences|
+-----------+-----------+------------+-------------------------+-------------------------+-------------------+
|B |Tool |[Tool, Wind]|[21.0, 23.0] |[1.0, 0.9130434782608695]|[0.0, 9.0] |
|C |Tool |[Tool] |[24.0] |[1.0] |[0.0] |
|A |Wind |[Wind] |[38.0] |[1.0] |[0.0] |
+-----------+-----------+------------+-------------------------+-------------------------+-------------------+
Upvotes: 2
Reputation: 2748
You can do everything you want within pandas.
Here is one example of how to reorganize your working code with the groupby
functionality.
First, I made a small change to your code so that get_stats
returns a DataFrame
instead of a Series
. I also changed the calling code to look like this:
df.groupby('Instruments').apply(get_stats).reset_index().drop(columns='level_1')
# Instruments Cools Chosen_Sers Average_Sounds_99_Percent Mean_Ratios Percent_Differences
# 0 A Wind Wind 38.0 1.000000 0.0
# 1 B Tool Wind 21.0 1.095238 9.0
# 2 B Wind Wind 23.0 1.000000 0.0
# 3 C Tool Tool 24.0 1.000000 0.0
Then it is possible to rewrite your code to process all Instruments at once, and all Sers at once within that.
# Compute 99th percentile Sounds by Instruments, filter the data.
quantile99 = df.groupby('Instruments')['Sounds'].quantile(.99)
cutoff_99 = df.merge(quantile99, on='Instruments', suffixes=['', '99'])
cutoff_99 = cutoff_99[cutoff_99['Sounds'] <= cutoff_99['Sounds99']].drop(columns='Sounds99')
# Find the most common Sers for each Instrument.
ser_counts = cutoff_99.groupby('Instruments').Sers.value_counts().sort_values(ascending=False)
sers_to_use = ser_counts.reset_index(name='n').drop_duplicates('Instruments').drop(columns='n')
filtered_sers = cutoff_99.merge(sers_to_use, on=['Instruments', 'Sers'])
# Compute the average Sounds for the most common Sers, and each Sers.
avg_sounds = filtered_sers.groupby('Instruments').mean()
sers_mean_sounds = cutoff_99.groupby(['Instruments', 'Sers']).mean()
# Compute derived values. This works because of the shared Instruments index.
difference = abs(avg_sounds - sers_mean_sounds) / ((avg_sounds + sers_mean_sounds) / 2)
ratios = avg_sounds / sers_mean_sounds
# Rename everything, merge the data together into your desired format.
chosen_sers_df = sers_to_use.rename(columns={'Sers': 'Chosen_Sers'})
sers_mean_df = sers_mean_sounds.reset_index().rename(columns={'Sounds': 'Average_Sounds_99_Percent'})
ratios_df = ratios.reset_index().rename(columns={'Sounds': 'Mean_Ratios'})
difference_df = (difference * 100).round().reset_index().rename(columns={'Sounds': 'Percent_Differences'})
result = chosen_sers_df.merge(sers_mean_df).merge(ratios_df).merge(difference_df).rename(columns={'Sers': 'Cools'})
It is likely that the code above could be cleaned up and streamlined.
But it produces the desired result, with a small amount of reordering due to the implementation changing.
result
# Instruments Chosen_Sers Cools Average_Sounds_99_Percent Mean_Ratios Percent_Differences
# 0 A Wind Wind 38 1.000000 0.0
# 1 C Tool Tool 24 1.000000 0.0
# 2 B Wind Tool 21 1.095238 9.0
# 3 B Wind Wind 23 1.000000 0.0
Upvotes: 1