vincwng
vincwng

Reputation: 658

How to aggregate on two columns in Spark SQL

Now I have a table with following task:

  1. Group by functions on DepartmentID and EmployeeID
  2. Within each group, I need to order them by (ArrivalDate, ArrivalTime) and pick the first one. So if two dates are different, pick the newer date. If two dates are same, pick the newer time.

I am trying with this kinda of approach:

input.select("DepartmenId","EmolyeeID", "ArrivalDate", "ArrivalTime", "Word")
  .agg(here will be the function that handles logic from 2)
  .show()

What is the syntax to aggregate here?

Thank you in advance.

// +-----------+---------+-----------+-----------+--------+
// |DepartmenId|EmolyeeID|ArrivalDate|ArrivalTime|   Word |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E1    |  20170101 |    0730   |  "YES" |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E1    |  20170102 |    1530   |  "NO"  |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E2    |  20170101 |    0730   |  "ZOO" |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E2    |  20170102 |    0330   |  "BOO" |
// +-----------+---------+-----------+-----------+--------+
// |     D2    |   E1    |  20170101 |    0730   |  "LOL" |
// +-----------+---------+-----------+-----------+--------+
// |     D2    |   E1    |  20170101 |    1830   |  "ATT" |
// +-----------+---------+-----------+-----------+--------+
// |     D2    |   E2    |  20170105 |    1430   |  "UNI" |
// +-----------+---------+-----------+-----------+--------+


// output should be

// +-----------+---------+-----------+-----------+--------+
// |DepartmenId|EmolyeeID|ArrivalDate|ArrivalTime|   Word |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E1    |  20170102 |    1530   |  "NO"  |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E2    |  20170102 |    0330   |  "BOO" |
// +-----------+---------+-----------+-----------+--------+
// |     D2    |   E1    |  20170101 |    1830   |  "ATT" |
// +-----------+---------+-----------+-----------+--------+
// |     D2    |   E2    |  20170105 |    1430   |  "UNI" |
// +-----------+---------+-----------+-----------+--------+

Upvotes: 0

Views: 1436

Answers (2)

Leo C
Leo C

Reputation: 22449

One approach would be to use Spark Window function:

val df = Seq(
  ("D1", "E1", "20170101", "0730", "YES"),
  ("D1", "E1", "20170102", "1530", "NO"),
  ("D1", "E2", "20170101", "0730", "ZOO"),
  ("D1", "E2", "20170102", "0330", "BOO"),
  ("D2", "E1", "20170101", "0730", "LOL"),
  ("D2", "E1", "20170101", "1830", "ATT"),
  ("D2", "E2", "20170105", "1430", "UNI")
).toDF(
  "DepartmenId", "EmolyeeID", "ArrivalDate", "ArrivalTime", "Word"
)

import org.apache.spark.sql.expressions.Window

val df2 = df.withColumn("rowNum", row_number().over(
    Window.partitionBy("DepartmenId", "EmolyeeID").
      orderBy($"ArrivalDate".desc, $"ArrivalTime".desc)
  )).
  select("DepartmenId", "EmolyeeID", "ArrivalDate", "ArrivalTime","Word").
  where($"rowNum" === 1).
  orderBy("DepartmenId", "EmolyeeID")

df2.show
+-----------+---------+-----------+-----------+----+
|DepartmenId|EmolyeeID|ArrivalDate|ArrivalTime|Word|
+-----------+---------+-----------+-----------+----+
|         D1|       E1|   20170102|       1530|  NO|
|         D1|       E2|   20170102|       0330| BOO|
|         D2|       E1|   20170101|       1830| ATT|
|         D2|       E2|   20170105|       1430| UNI|
+-----------+---------+-----------+-----------+----+

Upvotes: 2

Tzach Zohar
Tzach Zohar

Reputation: 37852

You can use max on a new Struct column that contains all the non-grouping columns, with ArrivalData first and ArrivalTime second: the sorting of that new column will match your requirement (Latest date first; Latest hour among similar dates first) so getting the maximum will produce the record you're after.

Then, you can use a select operation to "split" the struct back into separate columns.

import spark.implicits._
import org.apache.spark.sql.functions._

df.groupBy($"DepartmentID", $"EmployeeID")
  .agg(max(struct("ArrivalDate", "ArrivalTime", "Word")) as "struct")
  .select($"DepartmentID", $"EmployeeID",
    $"struct.ArrivalDate" as "ArrivalDate",
    $"struct.ArrivalTime" as "ArrivalTime",
    $"struct.Word" as "Word"
  )

Upvotes: 2

Related Questions