Reputation: 419
Data
Col1 Col2 result
0 a x 123.0
1 a y NaN
2 a x 453.0
3 a y 675.0
4 b z 786.0
5 b z 332.0
I want to fill NaN with 675.0, first group by col1 then by col2 and fill the NaN value
In Pandas
df['result'] = df['result'].fillna(df.groupby(['col1', 'col2', ])['result'].bfill())
df['result'] = df['result'].fillna(df.groupby(['col1', 'col2', ])['result'].ffill())
How I can implement it in pyspark ?
Upvotes: 1
Views: 968
Reputation: 318
This can be done via pandas udfs. Then you can use the functions you want directly.
[IN]
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd, numpy as np
s = pd.DataFrame({"col1": ["a","a","a","a","b","b"],
"col2": ["x", "y", "x", "y", "z", "z"],
"result": [123, np.nan, 453, 675, 786, 332]})
spark_df = spark.createDataFrame(s)
grouped_spark_df = spark_df.groupBy("col1", "col2")
@pandas_udf("col1 string, col2 string, result float", PandasUDFType.GROUPED_MAP)
def fillnaspark(df):
df['result'] = df['result'].bfill()
df['result'] = df['result'].ffill()
return df
grouped_spark_df.apply(fillnaspark).show()
[OUT]
+----+----+------+
|col1|col2|result|
+----+----+------+
| a| x| 123.0|
| a| x| 453.0|
| b| z| 786.0|
| b| z| 332.0|
| a| y| 675.0|
| a| y| 675.0|
+----+----+------+
Upvotes: 2
Reputation: 42332
You can use nanvl
to replace NaN
with the lead
value of result (equivalent to ffill
; bfill
would be lag
):
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
'id', F.monotonically_increasing_id()
).withColumn(
'result',
F.nanvl(
'result',
F.coalesce(
F.lag('result').over(Window.partitionBy('Col1', 'Col2').orderBy('id')),
F.lead('result').over(Window.partitionBy('Col1', 'Col2').orderBy('id'))
)
)
).orderBy('id')
df2.show()
+---+----+----+------+
| id|Col1|Col2|result|
+---+----+----+------+
| 0| a| x| 123.0|
| 1| a| y| 675.0|
| 2| a| x| 453.0|
| 3| a| y| 675.0|
| 4| b| x| 786.0|
| 5| b| y| 332.0|
+---+----+----+------+
Upvotes: 0