Reputation: 8737
How can I keep the rows that came from the left table when dropping duplicates after a full join? I want to have all rows of both tables, except in cases where there are duplicates, then I throw away the row from the right table.
I have tried the below, but without success:
schema = ["name", "source", "score"]
rows1 = [("Smith", "Acme", 98),
("Jones", "Acme", 30),
("Finch", "Acme", 62),
("Lewis", "Acme", 59),
("Starr", "Acme", 87)]
rows2 = [("Smith", "Beta", 64),
("Jones", "Beta", 75),
("Bryan", "Beta", 93),
("Lewis", "Beta", 59),
("Starr", "Beta", 81)]
df1 = spark.createDataFrame(rows1, schema)
df2 = spark.createDataFrame(rows2, schema)
jdf = df1.alias("a").join(other=df2.alias("b"), on="name", how="full")
jdf = jdf.orderBy("name").dropDuplicates(["name"])
for col in ["source", "score"]:
jdf = jdf.withColumn(colName=col, col=(F.col("a."+col) if F.col("a."+col).isNotNull() else F.col("b."+col)))
The above is invalid and results in this error:
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
The result I am trying to achieve:
+-----+------+-----+
| name|source|score|
+-----+------+-----+
|Bryan| Beta| 93|
|Finch| Acme| 62|
|Jones| Acme| 30|
|Lewis| Acme| 59|
|Smith| Acme| 98|
|Starr| Acme| 87|
+-----+------+-----+
Upvotes: 0
Views: 1853
Reputation: 8737
Something that appears to work: I used a "left-anti" join to get the rows that don't match from the right column, then union that with the left table.
schema = ["name", "source", "score"]
rows1 = [("Smith", "Acme", 98),
("Jones", "Acme", 30),
("Finch", "Acme", 62),
("Lewis", "Acme", 59),
("Starr", "Acme", 87)]
rows2 = [("Smith", "Beta", 64),
("Jones", "Beta", 75),
("Bryan", "Beta", 93),
("Lewis", "Beta", 59),
("Starr", "Beta", 81)]
df1 = spark.createDataFrame(rows1, schema)
df2 = spark.createDataFrame(rows2, schema)
jdf = df2.join(df1, on="name", how="leftanti")
udf = df1.union(jdf)
Upvotes: 0
Reputation: 3419
You can use the coalesce function. It returns the first non-null column. So provide the column from the left table first, and then the right table to coalesce
.
df2 = df2.withColumnRenamed("source", "source2").withColumnRenamed("score", "score2")
from pyspark.sql.functions import coalesce
df1.join(df2, ['name'], "full").withColumn('source', coalesce('source', 'source2'))\
.withColumn('score', coalesce('score', 'score2')).drop('source2', 'score2').show()
+-----+------+-----+
| name|source|score|
+-----+------+-----+
|Bryan| Beta| 93|
|Finch| Acme| 62|
|Jones| Acme| 30|
|Lewis| Acme| 59|
|Smith| Acme| 98|
|Starr| Acme| 87|
+-----+------+-----+
Upvotes: 1