Reputation: 31
Is there any way to replace null values in pyspark dataframe with the following value? I need to fill null in prices in the table:
+----------+----------+-----+
|product_id| ts|price|
+----------+----------+-----+
| 1|2024-05-01| null|
| 1|2024-05-02| 109|
| 1|2024-05-03| 120|
| 2|2024-05-01| null|
| 2|2024-05-02| null|
| 2|2024-05-03| 115|
+----------+----------+-----+
Expected result:
+----------+----------+-----+
|product_id| ts|price|
+----------+----------+-----+
| 1|2024-05-01| 109|
| 1|2024-05-02| 109|
| 1|2024-05-03| 120|
| 2|2024-05-01| 115|
| 2|2024-05-02| 115|
| 2|2024-05-03| 115|
+----------+----------+-----+
df = [{'product_id': 1, 'ts': "2024-05-01"},
{'product_id': 1, 'ts': "2024-05-02", "price": 109},
{'product_id': 1, 'ts': "2024-05-03", "price": 120},
{'product_id': 2, 'ts': "2024-05-01", },
{'product_id': 2, 'ts': "2024-05-02"},
{'product_id': 2, 'ts': "2024-05-03", "price": 115}]
data = spark.createDataFrame(df)
Tried func
def fill_na_prices(data: DataFrame) -> DataFrame:
return data.withColumn(
"price", F.first("price", ignorenulls=True,).over(Window.partitionBy("product_id").orderBy("ts"))
)
but it doesn't work.
Upvotes: 0
Views: 33
Reputation: 823
How about this?
from pyspark.sql import Window
import pyspark.sql.functions as F
windowSpec = (
Window.partitionBy("product_id")
.orderBy("ts")
.rowsBetween(Window.currentRow + 1, Window.unboundedFollowing)
)
data = data.withColumn(
"price",
F.when(
data["price"].isNull(), F.first("price", ignorenulls=True).over(windowSpec)
).otherwise(data["price"]),
)
data.show()
+----------+----------+-----+
|product_id| ts|price|
+----------+----------+-----+
| 1|2024-05-01| 109|
| 1|2024-05-02| 109|
| 1|2024-05-03| 120|
| 2|2024-05-01| 115|
| 2|2024-05-02| 115|
| 2|2024-05-03| 115|
+----------+----------+-----+
This code may still leave null values if the last row in the partition happens to be NULL. Is that what you want?
Upvotes: 1