Celso Marques
Celso Marques

Reputation: 418

How to get value from previous group in spark?

I need to get value of previous group in spark and set it to the current group. How can I achieve that? I must order by count instead of TEXT_NUM.

Ordering by TEXT_NUM is not possible because events repeat in time, as count 10 and 11 shows.

I'm trying with the following code:

   val spark = SparkSession.builder()
      .master("spark://spark-master:7077")
      .getOrCreate()

    val df = spark
      .createDataFrame(
        Seq[(Int, String, Int)](
          (0, "", 0),
          (1, "", 0),
          (2, "A", 1),
          (3, "A", 1),
          (4, "A", 1),
          (5, "B", 2),
          (6, "B", 2),
          (7, "B", 2),
          (8, "C", 3),
          (9, "C", 3),
          (10, "A", 1),
          (11, "A", 1)
        ))
      .toDF("count", "TEXT", "TEXT_NUM")

    val w1 = Window
      .orderBy("count")
      .rangeBetween(Window.unboundedPreceding, -1)
    df
      .withColumn("LAST_VALUE", last("TEXT_NUM").over(w1))
      .orderBy("count")
      .show()

Result:

+-----+----+--------+----------+
|count|TEXT|TEXT_NUM|LAST_VALUE|
+-----+----+--------+----------+
|    0|    |       0|      null|
|    1|    |       0|         0|
|    2|   A|       1|         0|
|    3|   A|       1|         1|
|    4|   A|       1|         1|
|    5|   B|       2|         1|
|    6|   B|       2|         2|
|    7|   B|       2|         2|
|    8|   C|       3|         2|
|    9|   C|       3|         3|
|   10|   A|       1|         3|
|   11|   A|       1|         1|
+-----+----+--------+----------+

Desired result:

+-----+----+--------+----------+
|count|TEXT|TEXT_NUM|LAST_VALUE|
+-----+----+--------+----------+
|    0|    |       0|      null|
|    1|    |       0|      null|
|    2|   A|       1|         0|
|    3|   A|       1|         0|
|    4|   A|       1|         0|
|    5|   B|       2|         1|
|    6|   B|       2|         1|
|    7|   B|       2|         1|
|    8|   C|       3|         2|
|    9|   C|       3|         2|
|   10|   A|       1|         3|
|   11|   A|       1|         3|
+-----+----+--------+----------+

Upvotes: 0

Views: 66

Answers (1)

Leo C
Leo C

Reputation: 22439

Consider using Window function last(columnName, ignoreNulls) to backfill nulls in a column that consists of previous "text_num" at group boundaries, as shown below:

val df = Seq(
  (0, "", 0), (1, "", 0),
  (2, "A", 1), (3, "A", 1), (4, "A", 1),
  (5, "B", 2), (6, "B", 2), (7, "B", 2),
  (8, "C", 3), (9, "C", 3),
  (10, "A", 1), (11, "A", 1)
).toDF("count", "text", "text_num")

import org.apache.spark.sql.expressions.Window
val w1 = Window.orderBy("count")
val w2 = w1.rowsBetween(Window.unboundedPreceding, 0)

df.
  withColumn("prev_num", lag("text_num", 1).over(w1)).
  withColumn("last_change", when($"text_num" =!= $"prev_num", $"prev_num")).
  withColumn("last_value", last("last_change", ignoreNulls=true).over(w2)).
  show
/*
+-----+----+--------+--------+-----------+----------+
|count|text|text_num|prev_num|last_change|last_value|
+-----+----+--------+--------+-----------+----------+
|    0|    |       0|    null|       null|      null|
|    1|    |       0|       0|       null|      null|
|    2|   A|       1|       0|          0|         0|
|    3|   A|       1|       1|       null|         0|
|    4|   A|       1|       1|       null|         0|
|    5|   B|       2|       1|          1|         1|
|    6|   B|       2|       2|       null|         1|
|    7|   B|       2|       2|       null|         1|
|    8|   C|       3|       2|          2|         2|
|    9|   C|       3|       3|       null|         2|
|   10|   A|       1|       3|          3|         3|
|   11|   A|       1|       1|       null|         3|
+-----+----+--------+--------+-----------+----------+
*/

The intermediary columns are kept in the output for references. Just drop them if they aren't needed.

Upvotes: 1

Related Questions