Patterson
Patterson

Reputation: 2821

PySpark Code Modification to Remove Nulls

I received help with following PySpark to prevent errors when doing a Merge in Databricks, see here

Databricks Error: Cannot perform Merge as multiple source rows matched and attempted to modify the same target row in the Delta table conflicting way

I was wondering if I could get help to modify the code to drop NULLs.

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

df2 = partdf.withColumn("rn", row_number().over(Window.partitionBy("P_key").orderBy("Id")))
df3 = df2.filter("rn = 1").drop("rn")

Thanks

Upvotes: 0

Views: 136

Answers (1)

Saideep Arikontham
Saideep Arikontham

Reputation: 6114

  • The code that you are using does not completely delete the rows where P_key is null. It is applying the row number for null values and where row number value is 1 where P_key is null, that row is not getting deleted.

enter image description here

  • You can instead use the df.na.drop instead to get the required result.
df.na.drop(subset=["P_key"]).show(truncate=False)

enter image description here

To make your approach work, you can use the following approach. Add a row with least possible unique id value. Store this id in a variable, use the same code and add additional condition in filter as shown below.

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number,when,col
df = spark.read.option("header",True).csv("dbfs:/FileStore/sample1.csv")
#adding row with least possible id value.
dup_id = '0'
new_row = spark.createDataFrame([[dup_id,'','x','x']], schema = ['id','P_key','c1','c2'])

#replacing empty string with null for P_Key
new_row = new_row.withColumn('P_key',when(col('P_key')=='',None).otherwise(col('P_key')))
df = df.union(new_row) #row added

#code to remove duplicates
df2 = df.withColumn("rn", row_number().over(Window.partitionBy("P_key").orderBy("id")))
df2.show(truncate=False)

#additional condition to remove added id row.
df3 = df2.filter((df2.rn == 1) & (df2.P_key!=dup_id)).drop("rn")
df3.show()

enter image description here

Upvotes: 1

Related Questions