sds
sds

Reputation: 60014

spark scalability: what am I doing wrong?

I am processing data with spark and it works with a day worth of data (40G) but fails with OOM on a week worth of data:

import pyspark
import datetime
import operator
sc = pyspark.SparkContext()
sqc = pyspark.sql.SQLContext(sc)
sc.union([sqc.parquetFile(hour.strftime('.....'))
          .map(lambda row:(row.id, row.foo))
          for hour in myrange(beg,end,datetime.timedelta(0,3600))]) \
  .reduceByKey(operator.add).saveAsTextFile("myoutput")

The number of different IDs is less than 10k. Each ID is a smallish int. The job fails because too many executors fail with OOM. When the job succeeds (on small inputs), "myoutput" is about 100k.

  1. what am I doing wrong?
  2. I tried replacing saveAsTextFile with collect (because I actually want to do some slicing and dicing in python before saving), there was no difference in behavior, same failure. is this to be expected?
  3. I used to have reduce(lambda x,y: x.union(y), [sqc.parquetFile(...)...]) instead of sc.union - which is better? Does it make any difference?

The cluster has 25 nodes with 825GB RAM and 224 cores among them.

Invocation is spark-submit --master yarn --num-executors 50 --executor-memory 5G.

A single RDD has ~140 columns and covers one hour of data, so a week is a union of 168(=7*24) RDDs.

Upvotes: 6

Views: 1561

Answers (2)

gsamaras
gsamaras

Reputation: 73366

Spark very often suffers from Out-Of-Memory errors when scaling. In these cases, fine tuning should be done by the programmer. Or recheck your code, to make sure that you don't do anything that is way too much, such as collecting all the in the driver, which is very likely to exceed the memoryOverhead limit, no matter how big you set it.

To understand what is happening you should realize when decides to kill a container for exceeding memory limits. That will happen when the container goes beyond the memoryOverhead limit.

In the Scheduler you can check the Event Timeline to see what happened with the containers. If Yarn has killed a container, it will be appear red and when you hover/click over it, you will see a message like:

Container killed by YARN for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

enter image description here


So in that case, what you want to focus on is these configuration properties (values are examples on my cluster):

# More executor memory overhead
spark.yarn.executor.memoryOverhead          4096

# More driver memory overhead
spark.yarn.driver.memoryOverhead            8192

# Max on my nodes
#spark.executor.cores                        8
#spark.executor.memory                       12G

# For the executors
spark.executor.cores                        6
spark.executor.memory                       8G

# For the driver
spark.driver.cores                          6
spark.driver.memory                         8G

The first thing to do is to increase the memoryOverhead.

In the driver or in the executors?

When you are overviewing your cluster from the UI, you can click on the Attempt ID and check the Diagnostics Info which should mention the ID of the container that was killed. If it is the same as with your AM Container, then it's the driver, else the executor(s).


That didn't resolve the issue, now what?

You have to fine tune the number of cores and the heap memory you are providing. You see will do most of the work in off-heap memory, so you want not to give too much space for the heap, since that would be wasted. You don't want to give too less, because the Garbage Collector will have issues then. Recall that these are JVMs.

As described here, a worker can host multiple executors, thus the number of cores used affects how much memory every executor has, so decreasing the #cores might help.

I have it written in memoryOverhead issue in Spark and Spark – Container exited with a non-zero exit code 143 in more detail, mostly that I won't forget! Another option, that I haven't tried would be spark.default.parallelism or/and spark.storage.memoryFraction, which based on my experience, didn't help.


You can pass configurations flags as sds mentioned, or like this:

spark-submit --properties-file my_properties

where "my_properties" is something like the attributes I list above.

For non numerical values, you could do this:

spark-submit --conf spark.executor.memory='4G' 

Upvotes: 2

sds
sds

Reputation: 60014

It turned out that the problem was not with spark, but with yarn. The solution is to run spark with

spark-submit --conf spark.yarn.executor.memoryOverhead=1000

(or modify yarn config).

Upvotes: 2

Related Questions