pat_triccia
pat_triccia

Reputation: 45

outer join in pyspark between a huge DF and a very small DF

I'm trying to make an outer join of DF A (with 433 Million rows) and DF B (with 14 rows) in PySpark and I have directly a OOM error. Does Spark is not good for this kind of join ?

result = DF1.join(broadcast(DF2), how='outer', on='key_column')

I have looked at the broadcast join and the partitioning by join key but is not working in my case. Do you have any other solution ?

Here is my conf:

use-spark --driver-memory 16G --executor-memory 40G --executor-cores 6

I've activated autoBroadcast:

spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "256MB")

But when I do explain, I see it is not using the braodcast:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [coalesce(CD_TYP#11, CD_TYP#40) AS CD_TYP#3095, col1#8L, col2#9L, col3#10, col4#12, col5#13, col6#14, col7#15, col8#16, col9#17L, col10#18, col11#19, col12#20, col13#21, col14#22, col15#23, col16#41]
   +- SortMergeJoin [CD_TYP#11], [CD_TYP#40], FullOuter
      :- Sort [CD_TYP#11 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(CD_TYP#11, 200), ENSURE_REQUIREMENTS, [plan_id=6329]
      :     +- FileScan parquet [all calls-names replaced for security-] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[abfs://[email protected]/data/v1.0.0/20240..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<col1:bigint,col2:bigint,col3:string,col4:string,col5:timestamp,NO...
      +- Sort [CD_TYP#40 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(CD_TYP#40, 200), ENSURE_REQUIREMENTS, [plan_id=6330]
            +- FileScan parquet [CD_TYP#40,LB_TYP#41] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[abfs://[email protected]/data/v1.0.0/20240..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<CD_TYP:string,LB_TYP:string>


24/05/16 14:48:31 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build right for full outer join.

Then I make a count or I try to write the table to really execute the join and the error is:

Executor lost connection due to JVM Out Of Memory (OOM) error with exit code 52. 

And :

org.apache.spark.shuffle.FetchFailedException

Thank you!

Upvotes: 0

Views: 107

Answers (0)

Related Questions