Yi Du
Yi Du

Reputation: 525

Fill in row missing values with previous and next non missing values

I know you can forward/backward fill in missing values with next non-missing values with last function combined with a window function.

But I have a data looks like:

Area,Date,Population
A, 1/1/2000, 10000
A, 2/1/2000, 
A, 3/1/2000, 
A, 4/1/2000, 10030
A, 5/1/2000, 

In this example, for May population, I like to fill in 10030 which is easy. But for Feb and Mar, I would like to fill in value is mean of 10000 and 10030, not 10000 or 10030.

Do you know how to implement this?

Thanks,

Upvotes: 0

Views: 526

Answers (2)

Som
Som

Reputation: 6323

Get the next and previous value and compute the mean as below-

df2.show(false)
    df2.printSchema()
    /**
      * +----+--------+----------+
      * |Area|Date    |Population|
      * +----+--------+----------+
      * |A   |1/1/2000|10000     |
      * |A   |2/1/2000|null      |
      * |A   |3/1/2000|null      |
      * |A   |4/1/2000|10030     |
      * |A   |5/1/2000|null      |
      * +----+--------+----------+
      *
      * root
      * |-- Area: string (nullable = true)
      * |-- Date: string (nullable = true)
      * |-- Population: integer (nullable = true)
      */

    val w1 = Window.partitionBy("Area").orderBy("Date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    val w2 = Window.partitionBy("Area").orderBy("Date").rowsBetween(Window.currentRow, Window.unboundedFollowing)
    df2.withColumn("previous", last("Population", ignoreNulls = true).over(w1))
      .withColumn("next", first("Population", ignoreNulls = true).over(w2))
      .withColumn("new_Population", (coalesce($"previous", $"next") + coalesce($"next", $"previous")) / 2)
      .drop("next", "previous")
      .show(false)

    /**
      * +----+--------+----------+--------------+
      * |Area|Date    |Population|new_Population|
      * +----+--------+----------+--------------+
      * |A   |1/1/2000|10000     |10000.0       |
      * |A   |2/1/2000|null      |10015.0       |
      * |A   |3/1/2000|null      |10015.0       |
      * |A   |4/1/2000|10030     |10030.0       |
      * |A   |5/1/2000|null      |10030.0       |
      * +----+--------+----------+--------------+
      */

Upvotes: 1

Lamanus
Lamanus

Reputation: 13541

Here is my try.

w1 and w2 are used to partition the window and w3 and w4 are used to fill the preceding and following values. After that, you can give the condition to calculate how fill the Population.

import pyspark.sql.functions as f
from pyspark.sql import Window

w1 = Window.partitionBy('Area').orderBy('Date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
w2 = Window.partitionBy('Area').orderBy('Date').rowsBetween(Window.currentRow, Window.unboundedFollowing)
w3 = Window.partitionBy('Area', 'partition1').orderBy('Date')
w4 = Window.partitionBy('Area', 'partition2').orderBy(f.desc('Date'))

df.withColumn('check', f.col('Population').isNotNull().cast('int')) \
  .withColumn('partition1', f.sum('check').over(w1)) \
  .withColumn('partition2', f.sum('check').over(w2)) \
  .withColumn('first', f.first('Population').over(w3)) \
  .withColumn('last',  f.first('Population').over(w4)) \
  .withColumn('fill', f.when(f.col('first').isNotNull() & f.col('last').isNotNull(), (f.col('first') + f.col('last')) / 2).otherwise(f.coalesce('first', 'last'))) \
  .withColumn('Population', f.coalesce('Population', 'fill')) \
  .orderBy('Date') \
  .select(*df.columns).show(10, False)

+----+--------+----------+
|Area|Date    |Population|
+----+--------+----------+
|A   |1/1/2000|10000.0   |
|A   |2/1/2000|10015.0   |
|A   |3/1/2000|10015.0   |
|A   |4/1/2000|10030.0   |
|A   |5/1/2000|10030.0   |
+----+--------+----------+

Upvotes: 0

Related Questions