Reputation: 1135
I have a spark data frame with columns like so:
df
--------------------------
A B C D E F amt
"A1" "B1" "C1" "D1" "E1" "F1" 1
"A2" "B2" "C2" "D2" "E2" "F2" 2
I would like to perform groupBy with column combinations
(A, B, sum(amt))
(A, C, sum(amt))
(A, D, sum(amt))
(A, E, sum(amt))
(A, F, sum(amt))
such that the resulting data frame looks like:
df_grouped
----------------------
A field value amt
"A1" "B" "B1" 1
"A2" "B" "B2" 2
"A1" "C" "C1" 1
"A2" "C" "C2" 2
"A1" "D" "D1" 1
"A2" "D" "D2" 2
My attempt at this was the following:
val cols = Vector("B","C","D","E","F")
//code for creating empty data frame with structs for the cols A, field, value and act
for (col <- cols){
empty_df = empty_df.union (df.groupBy($"A",col)
.agg(sum(amt).as(amt)
.withColumn("field",lit(col)
.withColumnRenamed(col, "value"))
}
I feel that the usage "for" or "foreach" may be clumsy for a distributed env such as spark. Are there any alternatives with map functionality for what I am doing? In my mind, aggregateByKey and collect_list may work; however, I am unable to imagine a complete solution. Please advise.
Upvotes: 1
Views: 2094
Reputation: 41987
foldLeft
is very powerful function devised in Scala if you know how to play with it. I am suggesting you to use foldLeft
function ( I have commented for clarity in the code and for explanation)
//selecting the columns without A and amt
val columnsForAggregation = df.columns.tail.toSet - "amt"
//creating an empty dataframe (format for final output
val finalDF = Seq(("empty", "empty", "empty", 0.0)).toDF("A", "field", "value", "amt")
//using foldLeft for the aggregation and merging each aggreted results
import org.apache.spark.sql.functions._
val (originaldf, transformeddf) = columnsForAggregation.foldLeft((df, finalDF)){(tempdf, column) => {
//aggregation on the dataframe with A and one of the column and finally selecting as required in the outptu
val aggregatedf = tempdf._1.groupBy("A", column).agg(sum("amt").as("amt"))
.select(col("A"), lit(column).as("field"), col(column).as("value"), col("amt"))
//union the aggregated results and transferring dataframes for next loop
(df, tempdf._2.union(aggregatedf))
}
}
//finally removing the dummy row created
transformeddf.filter(col("A") =!= "empty")
.show(false)
You should have the dataframe you desire
+---+-----+-----+---+
|A |field|value|amt|
+---+-----+-----+---+
|A1 |E |E1 |1.0|
|A2 |E |E2 |2.0|
|A1 |F |F1 |1.0|
|A2 |F |F2 |2.0|
|A2 |B |B2 |2.0|
|A1 |B |B1 |1.0|
|A2 |C |C2 |2.0|
|A1 |C |C1 |1.0|
|A1 |D |D1 |1.0|
|A2 |D |D2 |2.0|
+---+-----+-----+---+
I hope the answer is helpful
Concised form of above foldLeft
function is
import org.apache.spark.sql.functions._
val (originaldf, transformeddf) = columnsForAggregation.foldLeft((df, finalDF)){(tempdf, column) =>
(df, tempdf._2.union(tempdf._1.groupBy("A", column).agg(sum("amt").as("amt")).select(col("A"), lit(column).as("field"), col(column).as("value"), col("amt"))))
}
Upvotes: 2