Reputation: 73
i have a job that run on spark and is written in scala im using spark RDD. because of the expensive group by operations i get this error:
Container killed by YARN for exceeding memory limits. 22.4 GB of 22 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead
.
i increased memory over head but i get the same. i use 10 machines of r4.xlarge. i tried using r4.2xlarge and even r4.4xlarge but also same error. the data is im testing on is 5GB gzipped data( almost 50 unzipped of data and almost 6 million record).
some configurations:
spark.executor.memory
: 20480M
spark.driver.memory
: 21295M
spark.yarn.executor.memoryOverhead
: 2g
spark.executor.instances
: 10
And the code looks like this:
val groupedEntitiesRDD = datasetRDD
.groupBy(_.entityId)
.map({ case (key, valueIterator) => key -> valueIterator.toList })
.persist(StorageLevel.MEMORY_AND_DISK)
val deduplicatedRDD = groupedEntitiesRDD
.flatMap({ case (_, entities) => deduplication(entities) })
def deduplication(entities: List[StreamObject[JsValue]]): List[StreamObject[JsValue]] = {
entities
.groupBy(_.deduplicationKey)
.values
.map(duplicates => duplicates.maxBy(_.processingTimestamp.toEpochSecond))
.toList
}
Upvotes: 0
Views: 2898
Reputation: 13154
From my experience and from what I have read in the release notes of Spark 2.x, one needs to allocate a lot more off heap memory (spark.yarn.executor.memoryOverhead
) than in Spark 1.x.
You have only assigned 2G to memoryOverhead and 20GB memory. I believe you would get better results if you change that to say 8G memoryOverhead and 14GB executor memory.
Should you still still run into memory issues (like actual OOMs being thrown), you will need to look into data skews. Especially groupBy
operations will frequently cause serious data skews.
One final thing, you write that you use RDDs
- I hope you mean DataFrames
or DataSets
? RDDs
has very low performance with groupBy
(see for instance this blog post for reason why) so if you are on RDDs
you should use reduceByKey
instead. BUT essentially you should use DataFrames (or DataSets) instead, where groupBy
is indeed the right way to go.
EDIT!
You asked in a comment how to convert groupBy
to reduceByKey
. You can do that like this:
datasetRDD
.map{case(entityID, streamObject) => (entityID, List(streamObject))}
.reduceByKey(_++_)
.flatMap{case(_, entities) => deduplication(entities)
You haven't specified the data structure of these entities, but it looks like you are looking for some max value and in effect throwing away unwanted data. That should be build into the reduceByKey
-operation, such that you filter away unnecessary data while reducing.
Upvotes: 2