Soof
Soof

Reputation: 73

Spark memory limit exceeded issue

i have a job that run on spark and is written in scala im using spark RDD. because of the expensive group by operations i get this error: Container killed by YARN for exceeding memory limits. 22.4 GB of 22 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. i increased memory over head but i get the same. i use 10 machines of r4.xlarge. i tried using r4.2xlarge and even r4.4xlarge but also same error. the data is im testing on is 5GB gzipped data( almost 50 unzipped of data and almost 6 million record).

some configurations:

spark.executor.memory: 20480M spark.driver.memory: 21295M spark.yarn.executor.memoryOverhead: 2g spark.executor.instances: 10

And the code looks like this:

val groupedEntitiesRDD = datasetRDD 
  .groupBy(_.entityId) 
  .map({ case (key, valueIterator) => key -> valueIterator.toList }) 
  .persist(StorageLevel.MEMORY_AND_DISK) 

val deduplicatedRDD = groupedEntitiesRDD 
  .flatMap({ case (_, entities) => deduplication(entities) }) 

def deduplication(entities: List[StreamObject[JsValue]]): List[StreamObject[JsValue]] = { 
  entities 
    .groupBy(_.deduplicationKey) 
    .values 
    .map(duplicates => duplicates.maxBy(_.processingTimestamp.toEpochSecond)) 
    .toList 
}

Upvotes: 0

Views: 2898

Answers (1)

Glennie Helles Sindholt
Glennie Helles Sindholt

Reputation: 13154

From my experience and from what I have read in the release notes of Spark 2.x, one needs to allocate a lot more off heap memory (spark.yarn.executor.memoryOverhead) than in Spark 1.x.

You have only assigned 2G to memoryOverhead and 20GB memory. I believe you would get better results if you change that to say 8G memoryOverhead and 14GB executor memory.

Should you still still run into memory issues (like actual OOMs being thrown), you will need to look into data skews. Especially groupBy operations will frequently cause serious data skews.

One final thing, you write that you use RDDs - I hope you mean DataFrames or DataSets? RDDs has very low performance with groupBy (see for instance this blog post for reason why) so if you are on RDDs you should use reduceByKey instead. BUT essentially you should use DataFrames (or DataSets) instead, where groupBy is indeed the right way to go.

EDIT!

You asked in a comment how to convert groupBy to reduceByKey. You can do that like this:

datasetRDD
  .map{case(entityID, streamObject) => (entityID, List(streamObject))}
  .reduceByKey(_++_)
  .flatMap{case(_, entities) => deduplication(entities)

You haven't specified the data structure of these entities, but it looks like you are looking for some max value and in effect throwing away unwanted data. That should be build into the reduceByKey-operation, such that you filter away unnecessary data while reducing.

Upvotes: 2

Related Questions