Reputation: 695
I have a sequence of multiple small files (~1-8KB) and I want to calculate the word count of these files. Specifically, the Sequence I have is files: Seq[String]
where each string of the sequence is the path to each file. What I do to calculate the total word count based on that sequence is this:
val totalWordCount = sc.union(
files.map(path => sc.textFile(path))
).flatMap(line => line.split(" "))
.map((_,1))
// I use a hash partitioner for better performance of reduceByKey
.partitionBy(new HashPartitioner(numPartitions))
.reduceByKey(_ + _)
The problem I am having is that even when I have more than 10000 small files and use the above technique the execution time increases when I increase the partitions. Why is that?
Note that I cannot merge these small files into one from the beginning and as an input I need to have the Sequence of strings.
Upvotes: 1
Views: 1231
Reputation: 3725
sc.textFile
is not optimized for this case. Remember that optimal partition size is on the order of 100 MB, and right now, your sc.union
RDD is getting one partition per file -- <8k. Spark overhead is going to absolutely dominate anything you do in this paradigm.
You mentioned "increasing the partitions" in your question, but I think here you probably want to reduce the number of partitions. I'm not sure where numPartitions
came from, but this should be roughly total data size / 100MB. Your .partitionBy
step is performing a full shuffle, and so there will still be lots of overhead from the original too-many-partitions RDD, but it will likely perform better downstream.
Here's something else to try: a no-shuffle coalesce on the union:
val optimalNPartitions = ??? // calculate total size / 100MB here
val totalWordCount = sc.union(files.map(path => sc.textFile(path)))
.flatMap(line => line.split(" "))
.coalesce(optimalNPartitions, shuffle = false) // try with shuf = true as well!
.map((_,1))
.reduceByKey(_ + _)
While you say you're partitioning to a new hash partitioner to make reduceByKey more efficient, this is actually wrong.
Let's look at the two models. First, the one you had: partitionBy
followed by reduceByKey
. The partition step will do a full shuffle against a new hash partitioner -- all the data needs to move across the network. When you call reduce, all the like keys are already in the same place so no shuffle needs to happen.
Second, leave out partitionBy
and just call reduceByKey
. In this model, you come into the reduce
with no partitioner, so you have to shuffle. But before you shuffle each key, you're going to reduce locally -- if you had the word "dog" 100 times on one partition, you're going to shuffle ("dog", 100)
instead of ("dog", 1)
100 times. See where I'm going with this? Reduce actually requires only a partial shuffle, whose size is determined by the sparsity of keys (if you only have a few unique keys, very little is shuffled. If everything is unique, everything is shuffled).
Clearly model 2 is what we want. Get rid of that partitionBy
!
Upvotes: 3