Reputation: 1712
I am facing a strange behavior while using alias function to rename columns within agg() after pivoting. This code works:
tst = sqlContext.createDataFrame([(1,2,3,4),(3,2,5,4),(5,3,7,5),(7,3,9,5)],schema=['col1','col2','col3','col4'])
chk= tst.groupby('col1').pivot('col2').agg(F.sum('col3').alias('sum'),F.mean('col3').alias('mean'))
When I check the columns for this df, the name are as expected
chk.columns
Out[54]: ['col1', '2_sum', '2_mean', '3_sum', '3_mean']
But when I have just one aggregation, followed by pivot, the renaming does not work.
import pyspark.sql.functions as F
#Test data
tst = sqlContext.createDataFrame([(1,2,3,4),(3,2,5,4),(5,3,7,5),(7,3,9,5)],schema=['col1','col2','col3','col4'])
chk= tst.groupby('col1').pivot('col2').agg(F.sum('col3').alias('sum'))
Now, when I check the column results, the renaming does not work
chk.columns
Out[56]: ['col1', '2', '3']
Is this an expected behavior in spark? Am i missing something?
Upvotes: 0
Views: 336
Reputation: 6338
You may wanted to look at the spark git source code for pivot
override def output: Seq[Attribute] = {
val pivotAgg = aggregates match {
case agg :: Nil =>
pivotValues.map(value => AttributeReference(value.toString, agg.dataType)())
case _ =>
pivotValues.flatMap { value =>
aggregates.map(agg => AttributeReference(value + "_" + agg.sql, agg.dataType)())
}
}
groupByExprsOpt.getOrElse(Seq.empty).map(_.toAttribute) ++ pivotAgg
}
you can observe output cols is not getting appended by the agg.sql
when there is only one aggrgate expression after pivot
when there is single aggregate expression, output attribute is provided the pivot value
as name -
pivotValues.map(value => AttributeReference(value.toString, agg.dataType)())
Conclusion- This behaviour is expected and not strange.
Upvotes: 1