Reputation: 525
I know you can forward/backward fill in missing values with next non-missing values with last function combined with a window function.
But I have a data looks like:
Area,Date,Population
A, 1/1/2000, 10000
A, 2/1/2000,
A, 3/1/2000,
A, 4/1/2000, 10030
A, 5/1/2000,
In this example, for May population, I like to fill in 10030 which is easy. But for Feb and Mar, I would like to fill in value is mean of 10000 and 10030, not 10000 or 10030.
Do you know how to implement this?
Thanks,
Upvotes: 0
Views: 526
Reputation: 6323
Get the next
and previous
value and compute the mean as below-
df2.show(false)
df2.printSchema()
/**
* +----+--------+----------+
* |Area|Date |Population|
* +----+--------+----------+
* |A |1/1/2000|10000 |
* |A |2/1/2000|null |
* |A |3/1/2000|null |
* |A |4/1/2000|10030 |
* |A |5/1/2000|null |
* +----+--------+----------+
*
* root
* |-- Area: string (nullable = true)
* |-- Date: string (nullable = true)
* |-- Population: integer (nullable = true)
*/
val w1 = Window.partitionBy("Area").orderBy("Date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
val w2 = Window.partitionBy("Area").orderBy("Date").rowsBetween(Window.currentRow, Window.unboundedFollowing)
df2.withColumn("previous", last("Population", ignoreNulls = true).over(w1))
.withColumn("next", first("Population", ignoreNulls = true).over(w2))
.withColumn("new_Population", (coalesce($"previous", $"next") + coalesce($"next", $"previous")) / 2)
.drop("next", "previous")
.show(false)
/**
* +----+--------+----------+--------------+
* |Area|Date |Population|new_Population|
* +----+--------+----------+--------------+
* |A |1/1/2000|10000 |10000.0 |
* |A |2/1/2000|null |10015.0 |
* |A |3/1/2000|null |10015.0 |
* |A |4/1/2000|10030 |10030.0 |
* |A |5/1/2000|null |10030.0 |
* +----+--------+----------+--------------+
*/
Upvotes: 1
Reputation: 13541
Here is my try.
w1
and w2
are used to partition the window and w3
and w4
are used to fill the preceding and following values. After that, you can give the condition to calculate how fill the Population
.
import pyspark.sql.functions as f
from pyspark.sql import Window
w1 = Window.partitionBy('Area').orderBy('Date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
w2 = Window.partitionBy('Area').orderBy('Date').rowsBetween(Window.currentRow, Window.unboundedFollowing)
w3 = Window.partitionBy('Area', 'partition1').orderBy('Date')
w4 = Window.partitionBy('Area', 'partition2').orderBy(f.desc('Date'))
df.withColumn('check', f.col('Population').isNotNull().cast('int')) \
.withColumn('partition1', f.sum('check').over(w1)) \
.withColumn('partition2', f.sum('check').over(w2)) \
.withColumn('first', f.first('Population').over(w3)) \
.withColumn('last', f.first('Population').over(w4)) \
.withColumn('fill', f.when(f.col('first').isNotNull() & f.col('last').isNotNull(), (f.col('first') + f.col('last')) / 2).otherwise(f.coalesce('first', 'last'))) \
.withColumn('Population', f.coalesce('Population', 'fill')) \
.orderBy('Date') \
.select(*df.columns).show(10, False)
+----+--------+----------+
|Area|Date |Population|
+----+--------+----------+
|A |1/1/2000|10000.0 |
|A |2/1/2000|10015.0 |
|A |3/1/2000|10015.0 |
|A |4/1/2000|10030.0 |
|A |5/1/2000|10030.0 |
+----+--------+----------+
Upvotes: 0