Reputation: 255
In cloud Datafusion
I am using a joiner transform to join two tables.
One of them is a large table with about 87M Joins, while the other is a smaller table with only ~250 records. I am using 200 partitions in the joiner.
This causes the following failure:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 50 in stage 7.0 failed 4 times, most recent failure: Lost task 50.3 in stage 7.0 (TID xxx, cluster_workerx.c.project.internal, executor 6): ExecutorLostFailure (executor 6 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 133355 ms java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.spark.SparkException: Application application_xxxxx finished with failed status
On a closer look into the spark UI the 200 tasks for the Join, nearly 80% of the 87m records go into one task O/P which fails with the heartbeat error, while the succeeded tasks has very few record O/P ~<10k records
Seems like spark performs a shuffle hash Join, is there a way in datafusion/cdap where we can force a broadcast join since one of my table is very small? Or can i make come configuration changes to the cluster config to make this join work?
What are the performance tuning i can make in the data fusion pipeline. I didnt find any reference to the configuration, tuning in the Datafusion documentation
Upvotes: 1
Views: 525
Reputation: 2178
You can use org.apache.spark.sql.functions.broadcast(Dataset[T])
to mark a dataframe/dataset to be broadcasted while being joined. Broadcast is not always guaranteed but for 250 record it will work. If the dataframe with 87M rows is evenly partitioned then it should improve the performance.
Upvotes: 1