Reputation: 34547
I am analysing some data with PySpark DataFrames. Suppose I have a DataFrame df
that I am aggregating:
(df.groupBy("group")
.agg({"money":"sum"})
.show(100)
)
This will give me:
group SUM(money#2L)
A 137461285853
B 172185566943
C 271179590646
The aggregation works just fine but I dislike the new column name SUM(money#2L)
. Is there a way to rename this column into something human readable from the .agg
method? Maybe something more similar to what one would do in dplyr
:
df %>% group_by(group) %>% summarise(sum_money = sum(money))
Upvotes: 113
Views: 153132
Reputation: 3440
This is late to the party, but I think it's a more general solution than hardcoding a specific function and safer than trying to rename after the fact.
import pyspark.sql.functions as sf
y = {"money":"sum"}
df.groupBy("group").agg(
*[ getattr(sf, fun)(col).alias(col) for col, fun in y.items() ]
)
Upvotes: 0
Reputation: 2415
[Special case]
If we want to rename the aggregated columns with the same name as the columns being summed over (i.e.: sum(column1)
--> column1
), we can do it like so:
import pyspark.sql.functions as F
groupby_keys = ["categorical_column_1", "categorical_column_2"]
numerical_columns = ["numerical_column_1", "numerical_column_2"]
aggregation_computations = [F.sum(col).alias(col) for col in numerical_columns]
df = df.groupby(groupby_keys).agg(*aggregation_computations)
df.show()
+----------------------+----------------------+--------------------+--------------------+
| categorical_column_1 | categorical_column_2 | numerical_column_1 | numerical_column_2 |
+----------------------+----------------------+--------------------+--------------------+
| category_1_1 | category_2_1 | 1 | 1.0 |
| category_1_2 | category_2_1 | 2 | 2.0 |
| category_1_1 | category_2_2 | 3 | 3.0 |
| category_1_2 | category_2_2 | 4 | 4.0 |
+----------------------+----------------------+--------------------+--------------------+
Upvotes: 0
Reputation: 3542
.alias
and .withColumnRenamed
both work if you're willing to hard-code your column names. If you need a programmatic solution, e.g. friendlier names for an aggregation of all remaining columns, this provides a good starting point:
grouping_column = 'group'
cols = [F.sum(F.col(x)).alias(x) for x in df.columns if x != grouping_column]
(
df
.groupBy(grouping_column)
.agg(
*cols
)
)
Upvotes: 6
Reputation: 495
Another quick little one liner to add the the mix:
df.groupBy('group')
.agg({'money':'sum',
'moreMoney':'sum',
'evenMoreMoney':'sum'
})
.select(*(col(i).alias(i.replace("(",'_').replace(')','')) for i in df.columns))
just change the alias function to whatever you'd like to name them. The above generates sum_money, sum_moreMoney, since I do like seeing the operator in the variable name.
Upvotes: 0
Reputation: 34547
Although I still prefer dplyr syntax, this code snippet will do:
import pyspark.sql.functions as sf
(df.groupBy("group")
.agg(sf.sum('money').alias('money'))
.show(100))
It gets verbose.
Upvotes: 208
Reputation: 2455
While the previously given answers are good, I think they're lacking a neat way to deal with dictionary-usage in the .agg()
If you want to use a dict, which actually might be also dynamically generated because you have hundreds of columns, you can use the following without dealing with dozens of code-lines:
# Your dictionary-version of using the .agg()-function
# Note: The provided logic could actually also be applied to a non-dictionary approach
df = df.groupBy("group")\
.agg({
"money":"sum"
, "...": "..."
})
# Now do the renaming
newColumnNames = ["group", "money", "..."] # Provide the names for ALL columns of the new df
df = df.toDF(*newColumnNames) # Do the renaming
Of course the newColumnNames
-list can also be dynamically generated. E.g., if you only append columns from the aggregation to your df
you can pre-store newColumnNames = df.columns
and then just append the additional names.
Anyhow, be aware that the newColumnNames
must contain all column names of the dataframe, not only those to be renamed (because .toDF()
creates a new dataframe due to Sparks immutable RDDs)!
Upvotes: 3
Reputation: 641
I made a little helper function for this that might help some people out.
import re
from functools import partial
def rename_cols(agg_df, ignore_first_n=1):
"""changes the default spark aggregate names `avg(colname)`
to something a bit more useful. Pass an aggregated dataframe
and the number of aggregation columns to ignore.
"""
delimiters = "(", ")"
split_pattern = '|'.join(map(re.escape, delimiters))
splitter = partial(re.split, split_pattern)
split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
renamed = map(split_agg, agg_df.columns[ignore_first_n:])
renamed = zip(agg_df.columns[ignore_first_n:], renamed)
for old, new in renamed:
agg_df = agg_df.withColumnRenamed(old, new)
return agg_df
An example:
gb = (df.selectExpr("id", "rank", "rate", "price", "clicks")
.groupby("id")
.agg({"rank": "mean",
"*": "count",
"rate": "mean",
"price": "mean",
"clicks": "mean",
})
)
>>> gb.columns
['id',
'avg(rate)',
'count(1)',
'avg(price)',
'avg(rank)',
'avg(clicks)']
>>> rename_cols(gb).columns
['id',
'avg_rate',
'count_1',
'avg_price',
'avg_rank',
'avg_clicks']
Doing at least a bit to save people from typing so much.
Upvotes: 8
Reputation: 188
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName('test').getOrCreate()
data = [(1, "siva", 100), (2, "siva2", 200),(3, "siva3", 300),(4, "siva4", 400),(5, "siva5", 500)]
schema = ['id', 'name', 'sallary']
df = spark.createDataFrame(data, schema=schema)
df.show()
+---+-----+-------+
| id| name|sallary|
+---+-----+-------+
| 1| siva| 100|
| 2|siva2| 200|
| 3|siva3| 300|
| 4|siva4| 400|
| 5|siva5| 500|
+---+-----+-------+
**df.agg({"sallary": "max"}).withColumnRenamed('max(sallary)', 'max').show()**
+---+
|max|
+---+
|500|
+---+
Upvotes: 5
Reputation: 638
It's simple as:
val maxVideoLenPerItemDf = requiredItemsFiltered.groupBy("itemId").agg(max("playBackDuration").as("customVideoLength"))
maxVideoLenPerItemDf.show()
Use .as
in agg to name the new row created.
Upvotes: 7
Reputation: 397
df = df.groupby('Device_ID').agg(aggregate_methods)
for column in df.columns:
start_index = column.find('(')
end_index = column.find(')')
if (start_index and end_index):
df = df.withColumnRenamed(column, column[start_index+1:end_index])
The above code can strip out anything that is outside of the "()". For example, "sum(foo)" will be renamed as "foo".
Upvotes: 4
Reputation: 9825
withColumnRenamed
should do the trick. Here is the link to the pyspark.sql API.
df.groupBy("group")\
.agg({"money":"sum"})\
.withColumnRenamed("SUM(money)", "money")
.show(100)
Upvotes: 94