xyz_scala
xyz_scala

Reputation: 471

Fill null or empty with next Row value with spark

Is there a way to replace null values in spark data frame with next row not null value. There is additional row_count column added for windows partitioning and ordering. More specifically, I'd like to achieve the following result:

      +---------+-----------+      +---------+--------+
      | row_count |       id|      |row_count |     id|
      +---------+-----------+      +------+-----------+
      |        1|       null|      |     1|        109|
      |        2|        109|      |     2|        109|
      |        3|       null|      |     3|        108|
      |        4|       null|      |     4|        108|
      |        5|        108| =>   |     5|        108|
      |        6|       null|      |     6|        110|
      |        7|        110|      |     7|        110|
      |        8|       null|      |     8|       null|
      |        9|       null|      |     9|       null|
      |       10|       null|      |    10|       null|
      +---------+-----------+      +---------+--------+

I tried with below code, It is not giving proper result.

      val ss = dataframe.select($"*", sum(when(dataframe("id").isNull||dataframe("id") === "", 1).otherwise(0)).over(Window.orderBy($"row_count")) as "value")
      val window1=Window.partitionBy($"value").orderBy("id").rowsBetween(0, Long.MaxValue)
      val selectList=ss.withColumn("id_fill_from_below",last("id").over(window1)).drop($"row_count").drop($"value")

Upvotes: 1

Views: 2332

Answers (1)

Ranga Vure
Ranga Vure

Reputation: 1932

Here is a approach

  1. Filter the non nulls (dfNonNulls)
  2. Filter the nulls (dfNulls)
  3. Find the right value for null id, using join and Window function
  4. Fill the null dataframe (dfNullFills)
  5. union dfNonNulls and dfNullFills

data.csv

row_count,id
1,
2,109
3,
4,
5,108
6,
7,110
8,
9,
10,
var df = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("data.csv")

var dfNulls = df.filter(
  $"id".isNull
).withColumnRenamed(
  "row_count","row_count_nulls"
).withColumnRenamed(
  "id","id_nulls"
)

val dfNonNulls = df.filter(
  $"id".isNotNull
).withColumnRenamed(
  "row_count","row_count_values"
).withColumnRenamed(
  "id","id_values"
)

dfNulls = dfNulls.join(
  dfNonNulls, $"row_count_nulls" lt $"row_count_values","left"
).select(
  $"id_nulls",$"id_values",$"row_count_nulls",$"row_count_values"
)

val window = Window.partitionBy("row_count_nulls").orderBy("row_count_values")

val dfNullFills = dfNulls.withColumn(
  "rn", row_number.over(window)
).where($"rn" === 1).drop("rn").select(
  $"row_count_nulls".alias("row_count"),$"id_values".alias("id"))

dfNullFills .union(dfNonNulls).orderBy($"row_count").show()

which results in

+---------+----+
|row_count|  id|
+---------+----+
|        1| 109|
|        2| 109|
|        3| 108|
|        4| 108|
|        5| 108|
|        6| 110|
|        7| 110|
|        8|null|
|        9|null|
|       10|null|
+---------+----+

Upvotes: 0

Related Questions