Reputation: 53
I'm writing a program on Spark in scala. It's used to count the numbers of keys. Here is the data example:
Name Fruit Place A apple China A apple China A apple U.S A banana U.K B apple Japan B orange Chile C apple French
It's a data frame of many columns but I only care about the above three columns, so there may be some repeated records. I would like to count, for example, the number of production places of the fruit eaten by A.
val res = data.select("name","fruit","place")
.map(v=>((v.getString(0),v.getString(1)),ArrayBuffer(v.getString(2)))).rdd.reduceByKey((a,b)=>a++=b)
.map(v=>(v._1._1,Map(v._1._2 -> v._2.toSet.size))).reduceByKey((a,b)=>a++=b)
I first select the columns I need and then use ("name", "fruit") as the key to collect the production places in one ArrayBuffer for each kind of fruit eaten by each person. Then I use "name" as the key to collect the number of production places for each fruit in a map like {"apple": 2}. So the result is informally like RDD[("name",Map("fruit"->"places count"))].
In the program I did this kind of work about 3 times to calculate information similar to the above example. For example, to compute the number of different fruits in one production places eaten by each person.
The size of the data is about 80GB and I run the job on 50 executors. Each executor has 4 cores and memory of 24GB. Moreover the data is repartitioned into 200 partitions. So this job should be finished in a very short period of time as I expected. However, it took me more than one day to run the job and failed because of org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 10 and java.lang.OutOfMemoryError: GC overhead limit exceeded.
I did a lot of things to optimize this program, like reset the spark.mesos.executor.memoryOverhead and use mutable map to minimize the GC cost of frequently creating and cleaning objects. I even try to use reduceByKey to move the data with the same key into one partition to boost the performance, but with little help. The code is like:
val new_data = data.map(v=>(v.getAs[String]("name"),ArrayBuffer((v.getAs[String]("fruit"),v.getAs[String]("place")))))
.rdd.reduceByKey((a,b)=>a++=b).cache()
Then I don't need to shuffle the data each time I do similar calculations. And the later work can be done on the basis of new_data. However, it seems that this optimization doesn't work.
Finally, I found that there is about 50% of the data has the same value on the field "name", say "H". I removed the data with name "H" and the job finished in 1 hour.
Here is my question:
Why the distribution of keys has such a great impact on the performance of reduceByKey? I use the word "distribution" to express the number of occurrences of different keys. In my case, the size of the data is not big but one key dominates the data so the performance is greatly affected. I assume it's the problem of reduceByKey, am I wrong?
If I have to reserve the records with name "H", how to avoid the performance issue?
Is it possible to use reduceByKey to repartition the data and put the records with the same key ("name") into one partition?
Is it really help to move the records with the same key ("name") to one partition to improve the performance? I know it may cause memory issue but I have to run similar code in the program several times, so I guess it may help in the later work. Am I right?
Thanks for help!
Upvotes: 0
Views: 818
Reputation: 1
What you can do to avoid the big shuffle is to first do a data frame from fruit to places.
val fruitToPlaces = data.groupBy("fruit").agg(collect_set("place").as("places"))
This data frame should be small (i.e. fits in memory)
You do fruitToPlaces.cache.count
to make sure it's ok
Then you do a join on fruit.
data.join(fruitToPlaces, Seq("fruit"), "left_outer")
Spark should be smart enough to do a hash join (and not a shuffle join)
Upvotes: 0