Reputation: 45692
I failed to figure out how actually Spark SQL join operation works. I have read pretty massive explanation, but it doesn't bring a light on few questions.
For example, you have two database tables which are saved in Spark (in parquet or any other format). And you have to join them based on some column:
SELECT t1.column_name_1
FROM parquet.`data/table1.parquet` as t1
LEFT JOIN parquet.`data/table2.parquet` as t2
ON t2.column_name_2 = t1.column_name_1
WHERE t2.column_name_2 is NULL
LIMIT 1
I gonna launch this query as sparkSession.sql(joinQuery)
table1.parquet
RDD and table2.parquet
RDD? As I understand, Spark need some key, by which it perform shuffling. What would be the key if column_name_1
& column_name_2
each has 1.000.000 unique rows? How many unique keys (partitions) I will get? How many shuffles I will get?table1.parquet
and table2.parquet
RDD into memory?There is pretty trivial solution - just fiter DataFrames before filtering, and you will keep everything in RAM. But, I'm not sure this will perform well in my case.
Let's say filtering allow you to retrieve table1_subset1
and table1_subset2
from table1
. Now to get the same join results u need to do more joins. I mean:
table1 JOIN table2 = table1_subset1 JOIN table2 + table1_subset2 JOIN table2
The same thing if I will filter table2
:
table1 JOIN table2 = table1_subset1 JOIN table2_subset1 + table1_subset2 JOIN table2_subset1 +
table1_subset1 JOIN table2_subset2 + table1_subset2 JOIN table2_subset2
Now I have to join about 50 pairs of huge tables, each of them should be split into multiple chunks (subsets), lets say 5 chunks. So instead of 50 joins
I will get 50 * 5 * 5 = 1250
filter and join operations between chunks, where each of chunks is 5 times smaller that original table (RDD).
Am I right if I suppose that performance will downgrade a lot? Or Spark is clever enough to perform the same amount of shuffles?
Upvotes: 1
Views: 4149
Reputation: 1319
1) You can choose between a shuffle sort join or a shuffle hash join by explicitly setting spark.shuffle.manager (default is sort). A little more information is here. Hash is more efficient when you have a lot of keys in both tables.
2) In order to get around this issue, you need to filter your data before getting to this point. It's actually faster to add a prior step where you create a DataFrame(s) that represents the subset of data that you care about. Spark will also spill to disk by default in case you can't fit your dataframe(s) into memory. You can use saveAsTable() in the case of huge dataframes that consume all of your memory.
Upvotes: 2