Reputation: 1412
I am wondering which one is most efficient in spark to get below 4 frames
Approach 1: (join - 1, filter - 4)
merged_df = left_df.join(right_df, join_condition, how='full_outer')
df1 = merged_df.filter(sf.col('right_df.col1').isNull()).select('left_df.*')
df2 = merged_df.filter(sf.col('right_df.col1').isNotNull()).select('left_df.*')
df3 = merged_df.filter(sf.col('left_df.col1').isNull()).select('right_df.*')
df4 = merged_df.filter(sf.col('left_df.col1').isNotNull()).select('right_df.*')
Approach 2: (join - 4, filter - 0)
df1 = left_df.join(right_df, join_condition, how='left_anti')
df2 = left_df.join(right_df, join_condition, how='left_semi')
df3 = left_df.join(right_df, join_condition, how='right_anti')
df4 = left_df.join(right_df, join_condition, how='right_semi')
and
join_condition = (sf.col('left_df.col1') = sf.col('right_df.col1'))
Which of the above mentioned mechanisms is efficient?
Ref: https://medium.com/bild-journal/pyspark-joins-explained-9c4fba124839
EDIT
Consider col1
to be primary key column (i.e. non-nullable) in both dataframes.
Upvotes: 3
Views: 234
Reputation: 9427
Before commenting on efficiency, just want to point out that generally speaking the df_n
in both scenarios may not be identical:
>>> df1 = spark.createDataFrame([{'id1': 0, 'val1': "a"},{'id1': 1, 'val1': "b"},{'id1': None, 'val1': "df1"}])
>>> df2 = spark.createDataFrame([{'id2': 1, 'val2': "d"},{'id2': 2, 'val2': "e"},{'id2': None, 'val2': "df2"}])
>>> df1.show()
+----+----+
| id1|val1|
+----+----+
| 0| a|
| 1| b|
|null| df1|
+----+----+
>>> df2.show()
+----+----+
| id2|val2|
+----+----+
| 1| d|
| 2| e|
|null| df2|
+----+----+
>>> df1.join(df2, col("id1") == col("id2"), how="full_outer").show()
+----+----+----+----+
| id1|val1| id2|val2|
+----+----+----+----+
| 0| a|null|null|
|null| df1|null|null|
|null|null|null| df2|
| 1| b| 1| d|
|null|null| 2| e|
+----+----+----+----+
>>> df1.join(df2, col("id1") == col("id2"), how="full_outer").filter(col('id2').isNull()).select(df1["*"]).show()
+----+----+
| id1|val1|
+----+----+
| 0| a|
|null| df1|
|null|null|
+----+----+
>>> df1.join(df2, col("id1") == col("id2"), how="left_anti").show()
+----+----+
| id1|val1|
+----+----+
| 0| a|
|null| df1|
+----+----+
>>> df1.join(df2, col('id1') == col('id2'), how='full_outer').filter(col('id2').isNotNull()).select(df1['*']).show()
+----+----+
| id1|val1|
+----+----+
| 1| b|
|null|null|
+----+----+
>>> df1.join(df2, col('id1') == col('id2'), how='left_semi').show()
+---+----+
|id1|val1|
+---+----+
| 1| b|
+---+----+
This is, of course, because of how null
s are treated by SQL joins, and because the result of a 'full_outer'
join will contain all unmatched rows from both sides. The latter means that k2.isNotNull()
filter used to create df2
("semi-join
"), for example, will not eliminate any null
-filled rows produced by right-hand keys that do not match anything on the left-hand side of a full outer join. For example:
>>> df1 = spark.createDataFrame([{'k1': 0, 'v1': "a"},{'k1': 1, 'v1': "b"},{'k1': 2, 'v1': "c"}])
>>> df2 = spark.createDataFrame([{'k2': 2, 'v2': "d"},{'k2': 3, 'v2': "e"},{'k2': 4, 'v2': "f"}])
>>> df1.join(df2, col('k1') == col('k2'), how="full_outer").filter(col('k2').isNotNull()).select(df1["*"]).show()
+----+----+
| k1| v1|
+----+----+
|null|null|
| 2| c|
|null|null|
+----+----+
>>> df1.join(df2, col('k1') == col('k2'), how="left_semi").show()
+---+---+
| k1| v1|
+---+---+
| 2| c|
+---+---+
Upvotes: 3
Reputation: 2436
[Posting my answer hoping it could be revised by a more experienced user]
I'd say It won't matter. Spark will reorganize these operations for optimization so if in the end result is the same, then the DAG (Directed Acyclic Graph) and the execution plan will be kind of the same.
If the objective is performance, then 1 join would be more conveniente because it can take advantage of a join broadcast (if the df at right is not too big and can be alocated in memory)
Upvotes: 0