Reputation: 963
Let's assume we are having two dataframes, which we want to compare for differences with a leftanti join:
data1 = [
(1, 11, 20, None),
(2, 12, 22, 31),
]
data2 = [
(1, 11, 20, None),
(2, 12, 22, 31),
]
schema = StructType([ \
StructField("value_1",IntegerType(), True), \
StructField("value_2",IntegerType(), True), \
StructField("value_3",IntegerType(), True), \
StructField("value_4",IntegerType(), True), \
])
df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)
How can I nullsafe join these dataframes by multiple (all) columns? The only solution I came up with is as followed:
df = df1.join(df2, \
((df1.value_1.eqNullSafe(df2.value_1)) &
(df1.value_2.eqNullSafe(df2.value_2)) &
(df1.value_3.eqNullSafe(df2.value_3)) &
(df1.value_4.eqNullSafe(df2.value_4))),
"leftanti" \
)
But unfortunately we have to deal now with a dynamic list of huge amounts of columns. How could we rewrite this join in a way, that we can provide a list of columns to be joined on.
THX & BR
Upvotes: 2
Views: 3787
Reputation: 13
Though both solutions above work, the join columns are repeated in resulting DataFrame. This solution, wrapped in a generalized user defined function, works on Spark 3.4.1 and above to perform the null safe join and remove the duplicated columns:
def null_safe_join(left_df: DataFrame, right_df: DataFrame, join_cols: list, how: str) -> DataFrame:
right_df = (right_df.withColumnsRenamed({i: f'drop_{i}' for i in join_cols}))
join_condition = [col(i).eqNullSafe(col(f'drop_{i}')) for i in join_cols]
joined_df = (left_df.join(right_df, join_condition, how).drop(*[f'drop_{i}' for i in join_cols]))
return joined_df
Upvotes: 0
Reputation: 1
You can just use the list comprehension, for example like the solution given above, and you would get the same result
df = df1.join(df2, [df1[k].eqNullSafe(df2[k]) for k in join_cols], "leftanti")
Or my preference would be to use alias, so that I can immediately use the processed dataframe instead of creating new variables first. For example:
df = (
df1
.withColumn('left_or_right', lit('left'))
.alias('left')
.join(
df2
.withColumn('left_or_right', lit('right'))
.alias('right'),
[col(f'left.{c}').eqNullSafe(col(f'right.{c}')) for c in join_cols],
'leftanti'
)
)
df.sort('left.value_1').show(10,False)
Upvotes: 0
Reputation: 6644
As far as I understand the problem statement, you want to create dynamic join condition based on a list of columns that one provides. We can do that using reduce()
from functools
module.
join_cols = ['value_1', 'value_2', 'value_3', 'value_4']
from functools import reduce
join_condition = reduce(lambda x, y: x & y, [df1[k].eqNullSafe(df2[k]) for k in join_cols])
print(join_condition)
# Column<'((((value_1 <=> value_1) AND (value_2 <=> value_2)) AND (value_3 <=> value_3)) AND (value_4 <=> value_4))'>
You can use the join_condition
parameter in the .join()
directly.
df = df1.join(df2, join_condition, "leftanti")
Upvotes: 5