joshsuihn
joshsuihn

Reputation: 830

Efficient grouping by key using mapPartitions or partitioner in Spark

So, I have a data like the following,

[ (1, data1), (1, data2), (2, data3), (1, data4), (2, data5) ]

which I want to convert to the following, for further processing.

[ (1, [data1, data2, data4]), (2, [data3, data5]) ]

I used groupByKey and reduceByKey, but due to really large amount of data it fails. The data is not tall but wide. In other words, keys are from 1 upto 10000, but, value list ranges from 100k to 900k.

I am struggling with this issue and plan to apply mapPartitions or (Hash)partitioner.

So, if one of these may work, I'd like to know

  1. Using mapPartions, could you please give some code snippet?
  2. Using (Hash)partitioner, could you please give some example how to control partitions by some element like key.. e.g. is there a way to create each partition based on key (i.e. 1,2,.. above) with no need to shuffle.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 9 (flatMap at TSUMLR.scala:209) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1
        at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542)
        at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538)
        at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155)
        at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Upvotes: 6

Views: 3662

Answers (1)

zero323
zero323

Reputation: 330393

None of the proposed method would work. Partitioner by definition have to shuffle the data and will suffer from the same limitations as groupByKey. mapPartitions cannot move data to another partition so it is completely useless. Since your description of the problem is rather vague it is hard to give a specific advice but in general I would try following steps:

  • try to rethink the problem. Do you really need all the values at once? How do you plan to utilize these? Can you obtain the same results without collecting to a single partition?
  • is it possible to reduce the traffic? How many unique values do you expect? Is it possible to compress the data before the shuffle (for example count values or use RLE)?
  • consider using larger executors. Spark has to keep in memory only the values for a single key and can spill processed keys to disk.
  • split your data by key:

    val keys =  rdd.keys.distinct.collect
    val rdds = keys.map(k => rdd.filter(_._1 == k))
    

    and process each RDD separatelly.

Upvotes: 6

Related Questions