Nathan Case
Nathan Case

Reputation: 695

Spark ExecutorLostFailure Memory Exceeded

I have been trying to get a spark job to run to completion for several days now and I was finally able to get it to complete but there was still a large number of failed tasks where executors where being killed with the following message:

ExecutorLostFailure (executor 77 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 45.1 GB of 44.9 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead

These are the properties I am passing to the cluster:

[
    {
        "classification": "spark-defaults",
        "properties": {
            "spark.executor.memory": "41000m",
            "spark.driver.memory": "8000m",
            "spark.executor.cores": "6",
            "spark.shuffle.service.enabled": "true",
            "spark.executor.instances": "98",
            "spark.yarn.executor.memoryOverhead": "5000"
        }
    }
]

The cluster comprises of 20 machines each with 32 cores and 240G of memory. Should I just continue to raise the memoryOverhead or is there a point where it indicates a deeper problem. The error this time seemed to occur during a coalesce from 5000 partitions down to 500 before writing the resulting data to S3. I am guessing the coalesce caused a shuffle and since the cluster was already low on memory it pushed it too far.

The workflow is as follows:

  1. Load parquet files from s3 into dataframe
  2. Extract set of unique keys which group the data using sql query against dataframe
  3. Transform the dataframe to a JavaRDD and apply several map functions
  4. MapToPair the data
  5. combineByKey using the below, essentially merges individual objects into arrays of objects by key

    combineByKey(new function, add function, merge function, new HashPartitioner(5000), false, null);

  6. More Maps

  7. For each of several unique keys, filter the rdd to get just tuples with that key then persist each of those subsets to disk after coalescing

Another question is how the 44.9 number from above is derived. I figured the max memory would be executor memory + memoryOverhead which would be 46G not 44.9G.

Any help would be greatly appreciated, Nathan

Upvotes: 2

Views: 4291

Answers (1)

Glennie Helles Sindholt
Glennie Helles Sindholt

Reputation: 13154

From my experience this indicates af deeper problem and from what you have posted I see a couple of pitfalls.

First of all you may want to have a look at the partition sizes as the OOM could easily be caused by data skews created during the combineByKey operations. Perhaps some keys are very frequent?

If not, I would look to the coalesce function call. You haven't posted the code, so I can only guess at the DAG being generated, but I would be aware of the coalesce function and the other operations executed in the same write-stage.

Spark executes in stages and from what I can tell from your explanation, you call coalesce just before write and so depending on how many partition you have going into this final stage and depending on the transformations done in this stage, you may actually be operating on fewer partitions than required thus resulting in the OOM exception.

It's a little complicated to explain in words, but I will try to give a simple example on what could be going on.

Imagine the simple scenario where you read in a file of key-value pairs of say (Int, Double), you then apply some function to all the values like say round. You then wish to write the output back to a single file, so you call coalesce(1) followed by write. The code would look something like this:

val df = sqlContext.read.parquet("/path/to/my/file/")
df.map{case(key: Int, value: Double) => (key, round(value)}
  .toDF()
  .coalesce(1)
  .write
  .parquet("/my/output/path/")

Now one might think that the map operation is executed in parallel on your entire cluster, but if you pay attention to the spark ui, you will notice that this task is not distributed across your cluster. Because of the coalesce(1), Spark knows that everything needs to end up in a single partition, so it simply starts gathering all data into one partition applying the map function as it goes along. As you can probably imagine this can easily end up in OOM exceptions with a more complicated transformation.

I hope this gives you a couple of pointers as to where to look. Good luck :)

Upvotes: 6

Related Questions