James Adams
James Adams

Reputation: 8737

PySpark: How to keep only the rows from the left table when dropping duplicates after a full join?

How can I keep the rows that came from the left table when dropping duplicates after a full join? I want to have all rows of both tables, except in cases where there are duplicates, then I throw away the row from the right table.

I have tried the below, but without success:

schema = ["name", "source", "score"]

rows1 = [("Smith", "Acme", 98),
         ("Jones", "Acme", 30),
         ("Finch", "Acme", 62),
         ("Lewis", "Acme", 59),
         ("Starr", "Acme", 87)]

rows2 = [("Smith", "Beta", 64),
         ("Jones", "Beta", 75),
         ("Bryan", "Beta", 93),
         ("Lewis", "Beta", 59),
         ("Starr", "Beta", 81)]

df1 = spark.createDataFrame(rows1, schema)
df2 = spark.createDataFrame(rows2, schema)
jdf = df1.alias("a").join(other=df2.alias("b"), on="name", how="full")
jdf = jdf.orderBy("name").dropDuplicates(["name"])
for col in ["source", "score"]:
    jdf = jdf.withColumn(colName=col, col=(F.col("a."+col) if F.col("a."+col).isNotNull() else F.col("b."+col)))

The above is invalid and results in this error:

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

The result I am trying to achieve:

+-----+------+-----+
| name|source|score|
+-----+------+-----+
|Bryan|  Beta|   93|
|Finch|  Acme|   62|
|Jones|  Acme|   30|
|Lewis|  Acme|   59|
|Smith|  Acme|   98|
|Starr|  Acme|   87|
+-----+------+-----+

Upvotes: 0

Views: 1853

Answers (2)

James Adams
James Adams

Reputation: 8737

Something that appears to work: I used a "left-anti" join to get the rows that don't match from the right column, then union that with the left table.

schema = ["name", "source", "score"]

rows1 = [("Smith", "Acme", 98),
         ("Jones", "Acme", 30),
         ("Finch", "Acme", 62),
         ("Lewis", "Acme", 59),
         ("Starr", "Acme", 87)]

rows2 = [("Smith", "Beta", 64),
         ("Jones", "Beta", 75),
         ("Bryan", "Beta", 93),
         ("Lewis", "Beta", 59),
         ("Starr", "Beta", 81)]

df1 = spark.createDataFrame(rows1, schema)
df2 = spark.createDataFrame(rows2, schema)

jdf = df2.join(df1, on="name", how="leftanti")
udf = df1.union(jdf)

Upvotes: 0

Cena
Cena

Reputation: 3419

You can use the coalesce function. It returns the first non-null column. So provide the column from the left table first, and then the right table to coalesce.

df2 = df2.withColumnRenamed("source", "source2").withColumnRenamed("score", "score2")

from pyspark.sql.functions import coalesce

df1.join(df2, ['name'], "full").withColumn('source', coalesce('source', 'source2'))\
    .withColumn('score', coalesce('score', 'score2')).drop('source2', 'score2').show()

+-----+------+-----+
| name|source|score|
+-----+------+-----+
|Bryan|  Beta|   93|
|Finch|  Acme|   62|
|Jones|  Acme|   30|
|Lewis|  Acme|   59|
|Smith|  Acme|   98|
|Starr|  Acme|   87|
+-----+------+-----+

Upvotes: 1

Related Questions