Reputation: 695
I have been trying to get a spark job to run to completion for several days now and I was finally able to get it to complete but there was still a large number of failed tasks where executors where being killed with the following message:
ExecutorLostFailure (executor 77 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 45.1 GB of 44.9 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead
These are the properties I am passing to the cluster:
[
{
"classification": "spark-defaults",
"properties": {
"spark.executor.memory": "41000m",
"spark.driver.memory": "8000m",
"spark.executor.cores": "6",
"spark.shuffle.service.enabled": "true",
"spark.executor.instances": "98",
"spark.yarn.executor.memoryOverhead": "5000"
}
}
]
The cluster comprises of 20 machines each with 32 cores and 240G of memory. Should I just continue to raise the memoryOverhead or is there a point where it indicates a deeper problem. The error this time seemed to occur during a coalesce from 5000 partitions down to 500 before writing the resulting data to S3. I am guessing the coalesce caused a shuffle and since the cluster was already low on memory it pushed it too far.
The workflow is as follows:
combineByKey using the below, essentially merges individual objects into arrays of objects by key
combineByKey(new function, add function, merge function, new HashPartitioner(5000), false, null);
More Maps
Another question is how the 44.9 number from above is derived. I figured the max memory would be executor memory + memoryOverhead which would be 46G not 44.9G.
Any help would be greatly appreciated, Nathan
Upvotes: 2
Views: 4291
Reputation: 13154
From my experience this indicates af deeper problem and from what you have posted I see a couple of pitfalls.
First of all you may want to have a look at the partition sizes as the OOM could easily be caused by data skews created during the combineByKey
operations. Perhaps some keys are very frequent?
If not, I would look to the coalesce
function call. You haven't posted the code, so I can only guess at the DAG being generated, but I would be aware of the coalesce
function and the other operations executed in the same write-stage.
Spark executes in stages and from what I can tell from your explanation, you call coalesce
just before write
and so depending on how many partition you have going into this final stage and depending on the transformations done in this stage, you may actually be operating on fewer partitions than required thus resulting in the OOM exception.
It's a little complicated to explain in words, but I will try to give a simple example on what could be going on.
Imagine the simple scenario where you read in a file of key-value pairs of say (Int, Double)
, you then apply some function to all the values like say round
. You then wish to write the output back to a single file, so you call coalesce(1)
followed by write
. The code would look something like this:
val df = sqlContext.read.parquet("/path/to/my/file/")
df.map{case(key: Int, value: Double) => (key, round(value)}
.toDF()
.coalesce(1)
.write
.parquet("/my/output/path/")
Now one might think that the map
operation is executed in parallel on your entire cluster, but if you pay attention to the spark ui, you will notice that this task is not distributed across your cluster. Because of the coalesce(1)
, Spark knows that everything needs to end up in a single partition, so it simply starts gathering all data into one partition applying the map
function as it goes along. As you can probably imagine this can easily end up in OOM exceptions with a more complicated transformation.
I hope this gives you a couple of pointers as to where to look. Good luck :)
Upvotes: 6