Pedro
Pedro

Reputation: 45

Pyspark replace NA by searching another column for the same value

Values from Column_1 can't have multiple values on column_2. So, for the same Id we have the same value.

column_1 column_2
    52     A
    78     B
    52 

Expected

column_1 column_2
    52     A
    78     B
    52     A

Which means searching column_1 for the first column_1 value that matchs the same missing column_2 id.

I've a working solution using R, but using pyspark I couldn't find a similar approach.

Upvotes: 0

Views: 41

Answers (2)

Vaebhav
Vaebhav

Reputation: 5052

Since the same ID will always have the same value, as you have stated

One way to achieve this is to use the inherent sequence order present within your data and use the lag value to populate the missing values

You can utilise Lag Function to generate the previous value associated to your col_1 and Coalesce to get the first non-null value from the two

Data Preparation

df = pd.DataFrame({
        'col_1': [52,78,52,52,78,78],
        'col_2': ['A','B',None,'A','B',None]
})

sparkDF = sql.createDataFrame(df)

sparkDF.show()

+-----+-----+
|col_1|col_2|
+-----+-----+
|   52|    A|
|   78|    B|
|   52| null|
|   52|    A|
|   78|    B|
|   78| null|
+-----+-----+

Lag

window = Window.partitionBy('col_1').orderBy(F.col('col_2').desc())
    
sparkDF = sparkDF.withColumn('col_2_lag',F.lag('col_2').over(window))
    
sparkDF.show()

+-----+-----+---------+
|col_1|col_2|col_2_lag|
+-----+-----+---------+
|   52|    A|     null|
|   52|    A|        A|
|   52| null|        A|
|   78|    B|     null|
|   78|    B|        B|
|   78| null|        B|
+-----+-----+---------+

Coalesce

sparkDF  = sparkDF.withColumn('col_2',F.coalesce(F.col('col_2'),F.col('col_2_lag'))).drop('col_2_lag')
                
sparkDF.show()

+-----+-----+
|col_1|col_2|
+-----+-----+
|   52|    A|
|   52|    A|
|   52|    A|
|   78|    B|
|   78|    B|
|   78|    B|
+-----+-----+

Upvotes: 2

Steven
Steven

Reputation: 15283

I would do something like this, using max :

from pyspark.sql import functions as F, Window

df.withColumn(
    "column_2",
    F.coalesce(
        F.col("column_2"), F.max("column_2").over(Window.partitionBy("column_1"))
    ),
).show()

Upvotes: 1

Related Questions