Franco Ruggeri
Franco Ruggeri

Reputation: 23

Spark Broadcast Variable with only one task using it

To my understanding, Spark works like this:

Is there any advantage to use a broadcast variable instead of a standard variable when we know it is used only once, so there would be only one transfer even in case of a standard variable?

Example (Java):

public class SparkDriver {
    public static void main(String[] args) {
        String inputPath = args[0];
        String outputPath = args[1];

        Map<String,String> dictionary = new HashMap<>();
        dictionary.put("J", "Java");
        dictionary.put("S", "Spark");

        SparkConf conf = new SparkConf()
                .setAppName("Try BV")
                .setMaster("local");

        try (JavaSparkContext context = new JavaSparkContext(conf)) {
            final Broadcast<Map<String,String>> dictionaryBroadcast = context.broadcast(dictionary);

            context.textFile(inputPath)
                   .map(line -> {                // just one transformation using BV
                       Map<String,String> d = dictionaryBroadcast.value();
                       String[] words = line.split(" ");
                       StringBuffer sb = new StringBuffer();
                       for (String w : words)
                           sb.append(d.get(w)).append(" ");
                       return sb.toString();
                   })
                   .saveAsTextFile(outputPath);  // just one action!
        }
    }
}

Upvotes: 1

Views: 1362

Answers (2)

eliax1996
eliax1996

Reputation: 106

There are several advantages regarding the use of broadcast variables, even if you use only once:

  1. You avoid several problems of serialization. When you serialize an anonymous inner class that uses a field belonging to the external class this involves serializing its enclosing class. Even if spark and other framework has written a workaround to partially mitigate this problem, although sometimes the ClosureCleaner doesn't do the trick. You could avoid the NotSerializableExceptions by performing some tricks i.e.: copy a class member variable into a closure, transform the anonymous inner class into a class and put only the required fields in the constructor etc. If you use the BroadcastVariable you don't even think about that, the method itself will serialize only the required variable. I suggest reading not-serializable-exception question and the first answer to deepen better the concept.

  2. The serialization performance of the closure is, most of the time, worse than a specialized serialization method. As the official documentation of spark says: data-serialization

    Kryo is significantly faster and more compact than Java serialization (often as much as 10x).

    Searching on the Spark classes from the official spark repo I had seen that the closure is serialized through the variable SparkEnv.get.closureSerializer. The only assignment of that variable is the one present at line 306 of the SparkEnv class that use the standard and inefficient JavaSerializer. In that case, if you serialize a big object you lose some performance due to the network bandwidth. This could be also an explanation of why the official doc claiming about to switch to BroadcastVariable for task larger than 20 KiB.

  3. There is only one copy for each machine, in case of more executor on the same phisical machine there is an advantages.

     > Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks
    
  4. The distribution algorithm is probably a lot more efficient. Using the immutability property of the BroadcastVariable is not difficult to think of distributing following a p2p algorithm instead of a centralized one. Imagine for example from the driver whenever you had finished with the first executor sending the BroadcastVariable to the second, but in parallel the initial executor send the data to the third and so on. Picture kindly provided by the bitTorrent Wikipedia page:

BitTorrent distribution algorithm

I had no deepen the implementation from spark but, as the documentation of the Broadcast variables says:

Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

Surely a more efficient algorithm than the trivial centralized-one can be designed using the immutability property of the BroadcastVariable.

Long story short: isn't the same thing use a closure or a broadcast variable. If the object that you are sending is big, use a broadcast variable.

Upvotes: 1

Ged
Ged

Reputation: 18023

Please refer to this excellent article: https://www.mikulskibartosz.name/broadcast-variables-and-broadcast-joins-in-apache-spark/ I could re-write it but it serves the purpose well and answers your question.

In summary:

A broadcast variable is an Apache Spark feature that lets us send a read-only copy of a variable to every worker node in the Spark cluster.

The broadcast variable is useful only when we want to:

  1. Reuse the same variable across multiple stages of the Spark job
  2. Speed up joins via a small table that is broadcast to all worker nodes, not all Executors.

Upvotes: 1

Related Questions