kupe
kupe

Reputation: 71

Spark - java.lang.OutOfMemoryError: Requested array size exceeds VM limit

I am attempting a groupBy operation on dataframe in Cloudera's Spark (2.1.0) on a 7 node cluster with about 512GB of RAM total. My code is as follows.

ndf = ndf.repartition(20000)
by_user_df = ndf.groupBy(ndf.name) \
            .agg(collect_list("file_name")) \
            .withColumnRenamed('collect_list(file_name)', 'file_names')


by_user_df = by_user_df.repartition(20000)    
by_user_df.count()

ndf is a dataframe containing 2 columns, a userid and a filename. I am trying to create a list of filenames by userid for passing to CountVectorizer and clustering.

I get the following error

java.lang.OutOfMemoryError: Requested array size exceeds VM limit
    at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

From what I have read, this is due to allocating an array either bigger than what the VM can handle in contiguous memory or larger than a system maximum for array size. Many of the recommendations are to parallelize more by splitting into more partitions.

I have about 6k users and about 7k total filenames. I have noticed that the executor that dies spends the majority of its time in Garbage Collection.

I have tried the following this far:

  1. re-partitioning the ndf dataframe and the resulting dataframe. I have tried up to 60k in the repartition arguments for each.
  2. I have set "spark.sql.shuffle.partitions" in steps up to 20000
  3. I have boosted executor memory to 25G
  4. Even though the executor that dies does not appear to be the driver, I have boosted driver memory to 25G as well.

As an update to this question: I realized that in this case I am doing a binary clustering over the data so I really need just one of each of the filenames. Changing collect_list to collect_set left me with the output that I needed and was apparently small enough to run within the given parameters. I'm still going to try to fix the original case.

Upvotes: 2

Views: 9674

Answers (1)

user3689574
user3689574

Reputation: 1676

First of all I don't really understand why you need such a high value of partitions. I don't know how many cores you have on each of the 7 workers but I doubt you need more than 200 partitions (The extremely high amounts of partitions you are using may actually explain why your workers die from Garbage Collection)

Your problem looks like a memory problem within the definitions of the JVM so I see no reason to boost driver or workers memory.

I think what you need is to set the Xss or Xmx or MaxPermSize like explined here: How to fix "Requested array size exceeds VM limit" error in Java?

To do so you need to use --conf spark.driver.extraJavaOptions and --conf spark.executor.extraJavaOptions when you run spark.

For Example:

--conf spark.driver.extraJavaOptions="-Xss10m -XX:MaxPermSize=512M " --conf spark.executor.extraJavaOptions="-Xss10m -XX:MaxPermSize=128M "

Upvotes: 1

Related Questions