Muz
Muz

Reputation: 759

How to calculate product of columns followed by sum over all columns?

Table 1 --Spark DataFrame table

enter image description here

There is a column called "productMe" in Table 1; and there are also other columns like a, b, c and so on whose schema name is contained in a schema array T.

What I want is the inner product of columns(product each row of the two columns) in schema array T with the column productMe(Table 2). And sum each column of Table 2 to get Table 3.

Table 2 is not necessary if you have good idea to get Table 3 in one step.

Table 2 -- Inner product table

enter image description here

For example, the column "a·productMe" is (3*0.2, 6*0.6, 5*0.4) to get (0.6, 3.6, 2)

Table 3 -- sum table

enter image description here

For example, the column "sum(a·productMe)" is 0.6+3.6+2=6.2.

Table 1 is DataFrame of Spark, how can I get Table 3?

Upvotes: 0

Views: 3220

Answers (4)

eliasah
eliasah

Reputation: 40370

You can try something like the following :

val df = Seq(
  (3,0.2,0.5,0.4),
  (6,0.6,0.3,0.1),
  (5,0.4,0.6,0.5)).toDF("productMe", "a", "b", "c")
import org.apache.spark.sql.functions.col
val columnsToSum = df.
  columns.  // <-- grab all the columns by their name
  tail.     // <-- skip productMe
  map(col). // <-- create Column objects
  map(c => round(sum(c * col("productMe")), 3).as(s"sum_${c}_productMe"))
val df2 = df.select(columnsToSum: _*)
df2.show()
# +---------------+---------------+---------------+
# |sum_a_productMe|sum_b_productMe|sum_c_productMe|
# +---------------+---------------+---------------+
# |            6.2|            6.3|            4.3|
# +---------------+---------------+---------------+

The trick is to use df.select(columnsToSum: _*) which means that you want to select all the columns on which we did the sum of columns times the productMe column. The :_* is a Scala-specific syntax to specify that we are passing repeated arguments because we don't have a fix number of arguments.

Upvotes: 7

Jacek Laskowski
Jacek Laskowski

Reputation: 74669

All the other answers use sum aggregation that use groupBy under the covers.

groupBy always introduces a shuffle stage and usually (always?) is slower than corresponding window aggregates.

In this particular case, I also believe that window aggregates give better performance as you can see in their physical plans and details for their only one job.

CAUTION

Either solution uses one single partition to do the calculation that in turn makes them unsuitable for large datasets as their size together may easily exceed the memory size of a single JVM.

Window Aggregates

What follows is a window aggregate-based calculation which, in this particular case where we group over all the rows in a dataset, unfortunately gives the same physical plan. That makes my answer just a (hopefully) nice learning experience.

val df = Seq(
  (3,0.2,0.5,0.4),
  (6,0.6,0.3,0.1),
  (5,0.4,0.6,0.5)).toDF("productMe", "a", "b", "c")

// yes, I did borrow this trick with columns from @eliasah's answer
import org.apache.spark.sql.functions.col
val columns = df.columns.tail.map(col).map(c => c * col("productMe") as s"${c}_productMe")
val multiplies = df.select(columns: _*)
scala> multiplies.show
+------------------+------------------+------------------+
|       a_productMe|       b_productMe|       c_productMe|
+------------------+------------------+------------------+
|0.6000000000000001|               1.5|1.2000000000000002|
|3.5999999999999996|1.7999999999999998|0.6000000000000001|
|               2.0|               3.0|               2.5|
+------------------+------------------+------------------+

def sumOverRows(name: String) = sum(name) over ()
val multipliesCols = multiplies.
  columns.
  map(c => sumOverRows(c) as s"sum_${c}")
val answer = multiplies.
  select(multipliesCols: _*).
  limit(1)  // <-- don't use distinct or dropDuplicates here
scala> answer.show
+-----------------+---------------+-----------------+
|  sum_a_productMe|sum_b_productMe|  sum_c_productMe|
+-----------------+---------------+-----------------+
|6.199999999999999|            6.3|4.300000000000001|
+-----------------+---------------+-----------------+

Physical Plan

Let's see the physical plan then (as it was the only reason why we wanted to see how to do the query using window aggregates, wasn't it?)

enter image description here

The following is the details for the only job 0.

enter image description here

Upvotes: 2

Rahul Kanodiya
Rahul Kanodiya

Reputation: 325

We can do it with simple SparkSql

   val table1 = Seq(
   (3,0.2,0.5,0.4),
   (6,0.6,0.3,0.1),
   (5,0.4,0.6,0.5)
 ).toDF("productMe", "a", "b", "c")

table1.show
table1.createOrReplaceTempView("table1") 

val table2 = spark.sql("select a*productMe, b*productMe, c*productMe  from table1")   //spark is sparkSession here
table2.show

val table3 = spark.sql("select sum(a*productMe), sum(b*productMe), sum(c*productMe) from table1")
table3.show

Upvotes: 3

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

If I understand your question correctly then following can be your solution

   val df = Seq(
       (3,0.2,0.5,0.4),
       (6,0.6,0.3,0.1),
       (5,0.4,0.6,0.5)
     ).toDF("productMe", "a", "b", "c")

This gives input dataframe as you have (you can add more)

+---------+---+---+---+
|productMe|a  |b  |c  |
+---------+---+---+---+
|3        |0.2|0.5|0.4|
|6        |0.6|0.3|0.1|
|5        |0.4|0.6|0.5|
+---------+---+---+---+

And

val productMe = df.columns.head
val colNames = df.columns.tail
var tempdf = df
for(column <- colNames){
  tempdf = tempdf.withColumn(column, col(column)*col(productMe))
}

Above steps should give you Table2

+---------+------------------+------------------+------------------+
|productMe|a                 |b                 |c                 |
+---------+------------------+------------------+------------------+
|3        |0.6000000000000001|1.5               |1.2000000000000002|
|6        |3.5999999999999996|1.7999999999999998|0.6000000000000001|
|5        |2.0               |3.0               |2.5               |
+---------+------------------+------------------+------------------+

Table3 can be achieved as following

tempdf.select(sum("a").as("sum(a.productMe)"), sum("b").as("sum(b.productMe)"), sum("c").as("sum(c.productMe)")).show(false)

Table3 is

+-----------------+----------------+-----------------+
|sum(a.productMe) |sum(b.productMe)|sum(c.productMe) |
+-----------------+----------------+-----------------+
|6.199999999999999|6.3             |4.300000000000001|
+-----------------+----------------+-----------------+

Table2 can be achieved for any number of columns you have but Table3 would require you to define columns explicitly

Upvotes: 1

Related Questions