vanhooser
vanhooser

Reputation: 1757

How do I make my highly skewed join complete in Spark SQL?

My join is executing as follows:

SELECT
  left.*, 
  right.*
FROM `/foo/bar/baz` AS left
JOIN `/foo2/bar2/baz2` AS right
ON left.something = right.something

Dataset: /foo/bar/baz

+-----------+-------+
| something | val_1 |
+-----------+-------+
| a         |     1 |
| a         |     2 |
| a         |     3 |
| a         |     4 |
| a         |     5 |
| a         |     6 |
| a         |   ... |
| a         |   10K |
| b         |     1 |
| b         |     2 |
| b         |     3 |
+-----------+-------+

Dataset: /foo2/bar2/baz2

+-----------+-------+
| something | val_2 |
+-----------+-------+
| a         |     1 |
| a         |     2 |
| b         |     1 |
| b         |     2 |
| b         |     3 |
+-----------+-------+

I am getting OOM errors on my executors and I don't want to throw more memory at the executors unnecessarily. How do I ensure this join executes successfully without burning extra resources?

Upvotes: 1

Views: 2139

Answers (1)

vanhooser
vanhooser

Reputation: 1757

Salting the Join

One tactic to getting this join to execute successfully is to do what's known as salting the join.

Salted joins operate in Spark by splitting the table with many entries per key into smaller portions while exploding the smaller table into an equivalent number of copies. This results in the same-sized output as a normal join, but with smaller task sizes for the larger table thus a decreased risk of OOM errors. You salt a join by adding a column of random numbers 0 through N to the left table and making N copies of the right table. If you add your new random column to the join, you reduce the largest bucket to 1/N of its previous size.

The secret is the EXPLODE function. EXPLODE is a cross-product:

SELECT
  left.*, 
  right.*
FROM
  (
    SELECT 
      *, 
      FLOOR(RAND() * 8) AS salt 
      FROM `/foo/bar/baz`
  ) AS left
JOIN
  (
    SELECT 
      *, 
      EXPLODE(ARRAY(0,1,2,3,4,5,6,7)) AS salt 
      FROM `/foo2/bar2/baz2`
  ) AS right
ON 
left.something = right.something 
AND left.salt = right.salt

Tuning

  • How do you choose the factor to explode by? Educated guessing, mostly. Powers of 2 are a good way to find the right ballpark: 8, 16, 32.
  • A similar approach is to look at the row count per executor as your unsalted job is running.

What to Watch Out For

  • Make sure you don't make off-by-one errors when salting a join. That will make you lose a fraction of your records.
  • CEIL(RAND() * N) gives you integers between 0 and N. FLOOR(RAND() * N) gives you numbers between 0 and N — 1. Make sure you explode the correct set of numbers in your salted join!

The Overhead of Salt

  • Salting a join does not necessarily make your build faster. It just gives it a better chance of succeeding.
  • If you salt your joins unnecessarily, you may actually start seeing declines in performance.

Upvotes: 7

Related Questions