Reputation: 1163
I am reading High Performance Spark and the author introduces a technique that can be used to perform joins on highly skewed data by selectively filtering the data to build a HashMap with the data containing the most common keys. This HashMap is then sent to all the partitions to perform a broadcast join. The resulting data are concatenated with a union operation at the very end.
Apologies in advance, but the text does not give an example of this technique using code, so I cannot share a code snippet to illustrate it.
Text follows.
Sometimes not all of our smaller RDD will fit into memory, but some keys are so overrepresented in the large dataset that you want to broadcast just the most common keys. This is especially useful if one key is so large that it can't fit on a single partition. In this case you can use countByKeyApprox on the large RDD to get an approximate idea of which keys would most benefit from a broadcast. You then filter the smaller RDD for only these keys, collecting the result locally in a HashMap. Using sc.broadcast you can broadcast the HashMap so that each worker only has one copy and manually perform the join against the HashMap. Using the same HashMap you can then filter your large RDD down to not include the large number of duplicate keys and perform your standard join, uniting it with the result of your manual join. This approach is quite convoluted but may allow you to handle highly skewed data you couldn't otherwise process.
For those who don't know, a broadcast join is a technique where the user can avoid a shuffle incurred when joining two chunks of data by sending the smaller chunk to every single executor. Each executor then performs the join on its own. The idea is that the shuffle is so expensive that having each executor perform the join and then discard the data it doesn't need is sometimes the best way to go.
The text describes a situation where part of a chunk of data can be extracted and joined using a broadcast join. The result of the join is then unioned with the rest of the data.
The reason why this might be necessary is that excessive shuffling can usually be avoided by making sure data consisting of the same keys in the two chunks are both present in the same partition, so that the same executor handles it. However, there are situations where a single key is too large to fit on a single partition. In that case, the author suggests that separating the overrepresented key into a HashMap and performing a broadcast join on just the overrepresented key may be a good idea.
Is this a good idea? Moreover, a technique like this seems very situational, so Catalyst probably does not use this technique. Is that correct? Is it true Catalyst does not use this technique? If so, does that mean on highly skewed data this technique using RDDs can beat Catalyst operating on Dataframes or Datasets?
Upvotes: 1
Views: 66