Al Jenssen
Al Jenssen

Reputation: 695

Efficient union of multiple small RDDs

I have a sequence of multiple small files (~1-8KB) and I want to calculate the word count of these files. Specifically, the Sequence I have is files: Seq[String] where each string of the sequence is the path to each file. What I do to calculate the total word count based on that sequence is this:

val totalWordCount = sc.union(
      files.map(path => sc.textFile(path))
    ).flatMap(line => line.split(" "))
      .map((_,1))
      // I use a hash partitioner for better performance of reduceByKey
      .partitionBy(new HashPartitioner(numPartitions))
      .reduceByKey(_ + _)

The problem I am having is that even when I have more than 10000 small files and use the above technique the execution time increases when I increase the partitions. Why is that?

Note that I cannot merge these small files into one from the beginning and as an input I need to have the Sequence of strings.

Upvotes: 1

Views: 1231

Answers (1)

Tim
Tim

Reputation: 3725

Why it's slow

sc.textFile is not optimized for this case. Remember that optimal partition size is on the order of 100 MB, and right now, your sc.union RDD is getting one partition per file -- <8k. Spark overhead is going to absolutely dominate anything you do in this paradigm.

You mentioned "increasing the partitions" in your question, but I think here you probably want to reduce the number of partitions. I'm not sure where numPartitions came from, but this should be roughly total data size / 100MB. Your .partitionBy step is performing a full shuffle, and so there will still be lots of overhead from the original too-many-partitions RDD, but it will likely perform better downstream.

Another execution model to try

Here's something else to try: a no-shuffle coalesce on the union:

val optimalNPartitions = ??? // calculate total size / 100MB here
val totalWordCount = sc.union(files.map(path => sc.textFile(path)))
  .flatMap(line => line.split(" "))
  .coalesce(optimalNPartitions, shuffle = false) // try with shuf = true as well!
  .map((_,1))
  .reduceByKey(_ + _)

One final note

While you say you're partitioning to a new hash partitioner to make reduceByKey more efficient, this is actually wrong.

Let's look at the two models. First, the one you had: partitionBy followed by reduceByKey. The partition step will do a full shuffle against a new hash partitioner -- all the data needs to move across the network. When you call reduce, all the like keys are already in the same place so no shuffle needs to happen.

Second, leave out partitionBy and just call reduceByKey. In this model, you come into the reduce with no partitioner, so you have to shuffle. But before you shuffle each key, you're going to reduce locally -- if you had the word "dog" 100 times on one partition, you're going to shuffle ("dog", 100) instead of ("dog", 1) 100 times. See where I'm going with this? Reduce actually requires only a partial shuffle, whose size is determined by the sparsity of keys (if you only have a few unique keys, very little is shuffled. If everything is unique, everything is shuffled).

Clearly model 2 is what we want. Get rid of that partitionBy!

Upvotes: 3

Related Questions