Emma
Emma

Reputation: 41

Pyspark dataframe: How to randomly drop one row if there are two duplicated rows for the same primary key? Databricks

I'm working in Azure databricks, pyspark There is now an existing dataframe like below

df:

id Name Age Country
1 Mike 18 USA
2 Lily 25 Japan
2 Lily 22 Japan
3 John 23 China
4 Snow 36 Korea
5 Tom 28 Austria
5 Cindy 28 Italy

column "id" is the primary key and cannot have duplicated values, which means all the numbers should appear only once, but now as you could see, number 2 and 5 have two rows with different values. I need to keep only one row for id 2 and one row for id 5 only. We can drop randomly and keep the other row, or we can keep only the upper row if there are duplicated rows

I need to have the following:

id Name Age Country
1 Mike 18 USA
2 Lily 25 Japan
3 John 23 China
4 Snow 36 Korea
5 Tom 28 Austria

or below is also fine:

id Name Age Country
1 Mike 18 USA
2 Lily 22 Japan
3 John 23 China
4 Snow 36 Korea
5 Cindy 28 Italy

How should I write the scripts?

Thanks.

Upvotes: 0

Views: 315

Answers (2)

samkart
samkart

Reputation: 6644

you could simply use dropDuplicates() with id as subset column.

see below example

# given the following input
# +---+---+---+---+
# | id| c1| c2| c3|
# +---+---+---+---+
# |  1|foo|bar|baz|
# |  1|foo|baz|bar|
# |  2|foo|bar|baz|
# |  2|foo|bar|baz|
# +---+---+---+---+

data_sdf. \
    dropDuplicates(subset=['id']). \
    show()

# +---+---+---+---+
# | id| c1| c2| c3|
# +---+---+---+---+
# |  1|foo|bar|baz|
# |  2|foo|bar|baz|
# +---+---+---+---+

Upvotes: 0

Aswin
Aswin

Reputation: 7136

One approach to remove duplicate records is to add a row number column to the dataframe for each id group and then keep only the first row for each group.

Code:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# Define the schema for the dataframe
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Country", StringType(), True)
])

# Create the dataframe
data = [
    (1, "Mike", 18, "USA"),
    (2, "Lily", 25, "Japan"),
    (2, "Lily", 22, "Japan"),
    (3, "John", 23, "China"),
    (4, "Snow", 36, "Korea"),
    (5, "Tom", 28, "Austria"),
    (5, "Cindy", 28, "Italy")
]

df = spark.createDataFrame(data, schema)

# Show the dataframe
df.show()

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
# Define a window function to partition by id and order by any column
window = Window.partitionBy("id").orderBy("Name")

# Add a row number column to the dataframe
df = df.withColumn("row_number", row_number().over(window))

# Keep only the first row for each id
df = df.filter(df.row_number == 1).drop("row_number")

# Show the resulting dataframe
df.show()
  • Window.partitionBy("id").orderBy("Name"): This defines a window function that partitions the dataframe by the id column and orders the rows within each partition by the Name column.

  • df.withColumn("row_number", row_number().over(window)): This adds a new column called row_number to the dataframe, which contains the row number for each partition defined by the window function.

  • df.filter(df.row_number == 1).drop("row_number"): This filters the dataframe to keep only the rows where the row_number column is equal to 1, which corresponds to the first row within each partition. It then drops the row_number column from the resulting dataframe.

Output:

id Name Age Country
1 Mike 18 USA
2 Lily 25 Japan
3 John 23 China
4 Snow 36 Korea
5 Cindy 28 Italy

enter image description here

Upvotes: 0

Related Questions