newbie
newbie

Reputation: 1412

What is the best way to get different join output in pyspark?

I am wondering which one is most efficient in spark to get below 4 frames

Approach 1: (join - 1, filter - 4)

merged_df = left_df.join(right_df, join_condition, how='full_outer')
df1 = merged_df.filter(sf.col('right_df.col1').isNull()).select('left_df.*')
df2 = merged_df.filter(sf.col('right_df.col1').isNotNull()).select('left_df.*')
df3 = merged_df.filter(sf.col('left_df.col1').isNull()).select('right_df.*')
df4 = merged_df.filter(sf.col('left_df.col1').isNotNull()).select('right_df.*')

Approach 2: (join - 4, filter - 0)

df1 = left_df.join(right_df, join_condition, how='left_anti')
df2 = left_df.join(right_df, join_condition, how='left_semi')
df3 = left_df.join(right_df, join_condition, how='right_anti')
df4 = left_df.join(right_df, join_condition, how='right_semi')

and

join_condition = (sf.col('left_df.col1') = sf.col('right_df.col1'))

Which of the above mentioned mechanisms is efficient?

Ref: https://medium.com/bild-journal/pyspark-joins-explained-9c4fba124839

EDIT Consider col1 to be primary key column (i.e. non-nullable) in both dataframes.

Upvotes: 3

Views: 234

Answers (2)

mazaneicha
mazaneicha

Reputation: 9427

Before commenting on efficiency, just want to point out that generally speaking the df_n in both scenarios may not be identical:

>>> df1 = spark.createDataFrame([{'id1': 0, 'val1': "a"},{'id1': 1, 'val1': "b"},{'id1': None, 'val1': "df1"}])
>>> df2 = spark.createDataFrame([{'id2': 1, 'val2': "d"},{'id2': 2, 'val2': "e"},{'id2': None, 'val2': "df2"}])

>>> df1.show()
+----+----+
| id1|val1|
+----+----+
|   0|   a|
|   1|   b|
|null| df1|
+----+----+

>>> df2.show()
+----+----+
| id2|val2|
+----+----+
|   1|   d|
|   2|   e|
|null| df2|
+----+----+

>>> df1.join(df2, col("id1") == col("id2"), how="full_outer").show()                                                                 
+----+----+----+----+
| id1|val1| id2|val2|
+----+----+----+----+
|   0|   a|null|null|
|null| df1|null|null|
|null|null|null| df2|
|   1|   b|   1|   d|
|null|null|   2|   e|
+----+----+----+----+

>>> df1.join(df2, col("id1") == col("id2"), how="full_outer").filter(col('id2').isNull()).select(df1["*"]).show()
+----+----+
| id1|val1|
+----+----+
|   0|   a|
|null| df1|
|null|null|
+----+----+

>>> df1.join(df2, col("id1") == col("id2"), how="left_anti").show()
+----+----+
| id1|val1|
+----+----+
|   0|   a|
|null| df1|
+----+----+

>>> df1.join(df2, col('id1') == col('id2'), how='full_outer').filter(col('id2').isNotNull()).select(df1['*']).show()
+----+----+
| id1|val1|
+----+----+
|   1|   b|
|null|null|
+----+----+

>>> df1.join(df2, col('id1') == col('id2'), how='left_semi').show()
+---+----+
|id1|val1|
+---+----+
|  1|   b|
+---+----+

This is, of course, because of how nulls are treated by SQL joins, and because the result of a 'full_outer' join will contain all unmatched rows from both sides. The latter means that k2.isNotNull() filter used to create df2 ("semi-join"), for example, will not eliminate any null-filled rows produced by right-hand keys that do not match anything on the left-hand side of a full outer join. For example:

>>> df1 = spark.createDataFrame([{'k1': 0, 'v1': "a"},{'k1': 1, 'v1': "b"},{'k1': 2, 'v1': "c"}])                                           
>>> df2 = spark.createDataFrame([{'k2': 2, 'v2': "d"},{'k2': 3, 'v2': "e"},{'k2': 4, 'v2': "f"}])
>>> df1.join(df2, col('k1') == col('k2'), how="full_outer").filter(col('k2').isNotNull()).select(df1["*"]).show()                           
+----+----+                                                                     
|  k1|  v1|
+----+----+
|null|null|
|   2|   c|
|null|null|
+----+----+
>>> df1.join(df2, col('k1') == col('k2'), how="left_semi").show()
+---+---+                                                                       
| k1| v1|
+---+---+
|  2|  c|
+---+---+

Upvotes: 3

Luiz Viola
Luiz Viola

Reputation: 2436

[Posting my answer hoping it could be revised by a more experienced user]

I'd say It won't matter. Spark will reorganize these operations for optimization so if in the end result is the same, then the DAG (Directed Acyclic Graph) and the execution plan will be kind of the same.

If the objective is performance, then 1 join would be more conveniente because it can take advantage of a join broadcast (if the df at right is not too big and can be alocated in memory)

Upvotes: 0

Related Questions