mobupu
mobupu

Reputation: 255

How to compute the numerical difference between columns of different dataframes?

Given two spark dataframes A and B with the same number of columns and rows, I want to compute the numerical difference between the two dataframes and store it into another dataframe (or another data structure optionally).

For instance let us have the following datasets

DataFrame A:

+----+---+
|  A | B |
+----+---+
|   1|  0|
|   1|  0|
+----+---+

DataFrame B:

----+---+
|  A | B |
+----+---+
|   1| 0 |
|   0| 0 |
+----+---+

How to obtain B-A, i.e


+----+---+
| c1 | c2|
+----+---+
|   0| 0 |
|  -1| 0 |
+----+---+

In practice the real dataframes have a consequent number of rows and 50+ columns for which the difference need to be computed. What is the Spark/Scala way of doing it?

Upvotes: 1

Views: 335

Answers (2)

pratiksadaphal
pratiksadaphal

Reputation: 129

I was able to solve this by using the approach below. This code can work with any number of columns. You just have to change the input DFs accordingly.

import org.apache.spark.sql.Row

val df0 = Seq((1, 5), (1, 4)).toDF("a", "b")
val df1 = Seq((1, 0), (3, 2)).toDF("a", "b")

val columns = df0.columns
    val rdd = df0.rdd.zip(df1.rdd).map {
      x =>
        val arr = columns.map(column =>
          x._2.getAs[Int](column) - x._1.getAs[Int](column))
        Row(arr: _*)
    }

spark.createDataFrame(rdd, df0.schema).show(false)

Output generated:

df0=>
+---+---+
|a  |b  |
+---+---+
|1  |5  |
|1  |4  |
+---+---+
df1=>
+---+---+
|a  |b  |
+---+---+
|1  |0  |
|3  |2  |
+---+---+
Output=>
+---+---+
|a  |b  |
+---+---+
|0  |-5 |
|2  |-2 |
+---+---+

Upvotes: 1

chlebek
chlebek

Reputation: 2451

If your df A is the same as df B you can try below approach. I don't know if this will work correct for large datasets, it will be better to have id for joining already instead of creating it using monotonically_increasing_id().

  import spark.implicits._
  import org.apache.spark.sql.functions._

  val df0 = Seq((1, 0), (1, 0)).toDF("a", "b")
  val df1 = Seq((1, 0), (0, 0)).toDF("a", "b")

  // new cols names
  val colNamesA = df0.columns.map("A_" + _)
  val colNamesB = df0.columns.map("B_" + _)

  // rename cols and add id
  val dfA = df0.toDF(colNamesA: _*)
    .withColumn("id", monotonically_increasing_id())
  val dfB = df1.toDF(colNamesB: _*)
    .withColumn("id", monotonically_increasing_id())

  dfA.show()
  dfB.show()

  // get columns without id
  val dfACols = dfA.columns.dropRight(1).map(dfA(_))
  val dfBCols = dfB.columns.dropRight(1).map(dfB(_))

  // diff between cols
  val calcCols = (dfACols zip dfBCols).map(s=>s._2-s._1)

  // join dfs
  val joined = dfA.join(dfB, "id")
  joined.show()

  calcCols.foreach(_.explain(true))

  joined.select(calcCols:_*).show()
    +---+---+---+
    |A_a|A_b| id|
    +---+---+---+
    |  1|  0|  0|
    |  1|  0|  1|
    +---+---+---+

    +---+---+---+
    |B_a|B_b| id|
    +---+---+---+
    |  1|  0|  0|
    |  0|  0|  1|
    +---+---+---+

    +---+---+---+---+---+
    | id|A_a|A_b|B_a|B_b|
    +---+---+---+---+---+
    |  0|  1|  0|  1|  0|
    |  1|  1|  0|  0|  0|
    +---+---+---+---+---+

    (B_a#26 - A_a#18)
    (B_b#27 - A_b#19)

    +-----------+-----------+
    |(B_a - A_a)|(B_b - A_b)|
    +-----------+-----------+
    |          0|          0|
    |         -1|          0|
    +-----------+-----------+


Upvotes: 0

Related Questions