Reputation: 71
I struggle with an OutOfMemory Exception in Spark which is thrown while doing repartition. The program is processing the following steps:
JavaRDD<A> data = sc.objectFile(this.inSource);
JavaPairRDD<String, A> dataWithKey = data.mapToPair(d -> new Tuple2<>(d.getUjid(), d));
JavaPairRDD<ADesc, AStats> dataInformation = dataWithKey.groupByKey()
.flatMapToPair(v -> getDataInformation(v._2()));
dataInformation.groupByKey().repartition(PARTITIONS).map(v -> merge(v._1(), v._2()));
getDataInformation maps a group of datapoints with the same id to several new datapoints:
Iterable<Tuple2<ADesc, AStats>> getDataInformation(Iterable<A> aIterator)
E.g.: (ID1, Data1), (ID1,Data2), (ID1,Data3) -> (Data_Description_1, Stats1), (Data_Description_2, Stats2)
Information:
While merging we get an OutOfMemory. Therefore we insert a repartition and run out of memory as well. All stages including flatMapToPair work correctly. We tried different values for PARTITIONS until we were up to 5000 tasks whereby the most tasks have very little work to do, while some have to progress a few MB and 3 tasks (independent from the number of partitions) always run out of memory. My question is why spark shuffels the data very unbalanced and is running out of memory while doing a repartition?
Upvotes: 0
Views: 1163
Reputation: 71
I solved my problem and give a short overview. Maybe someone will find this useful in future.
The problem was in dataInformation.groupByKey().repartition(PARTITIONS).map(v -> merge(v._1(), v._2()));
I had a lot of objects with the same key that should be merged and therefore a lot of objects were on the same partition and the task went OOM. I changed the code and used reducedByKey and modified the merge function that it does not merge all objects with the same key but merges 2 objects with the same key. Because the function is associative the result is the same.
In short: groupByKey grouped to many objects to one task
Upvotes: 2