Reputation: 71
I am attempting a groupBy operation on dataframe in Cloudera's Spark (2.1.0) on a 7 node cluster with about 512GB of RAM total. My code is as follows.
ndf = ndf.repartition(20000)
by_user_df = ndf.groupBy(ndf.name) \
.agg(collect_list("file_name")) \
.withColumnRenamed('collect_list(file_name)', 'file_names')
by_user_df = by_user_df.repartition(20000)
by_user_df.count()
ndf is a dataframe containing 2 columns, a userid and a filename. I am trying to create a list of filenames by userid for passing to CountVectorizer and clustering.
I get the following error
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
From what I have read, this is due to allocating an array either bigger than what the VM can handle in contiguous memory or larger than a system maximum for array size. Many of the recommendations are to parallelize more by splitting into more partitions.
I have about 6k users and about 7k total filenames. I have noticed that the executor that dies spends the majority of its time in Garbage Collection.
I have tried the following this far:
As an update to this question: I realized that in this case I am doing a binary clustering over the data so I really need just one of each of the filenames. Changing collect_list
to collect_set
left me with the output that I needed and was apparently small enough to run within the given parameters. I'm still going to try to fix the original case.
Upvotes: 2
Views: 9674
Reputation: 1676
First of all I don't really understand why you need such a high value of partitions. I don't know how many cores you have on each of the 7 workers but I doubt you need more than 200 partitions (The extremely high amounts of partitions you are using may actually explain why your workers die from Garbage Collection)
Your problem looks like a memory problem within the definitions of the JVM so I see no reason to boost driver or workers memory.
I think what you need is to set the Xss or Xmx or MaxPermSize like explined here: How to fix "Requested array size exceeds VM limit" error in Java?
To do so you need to use --conf spark.driver.extraJavaOptions and --conf spark.executor.extraJavaOptions when you run spark.
For Example:
--conf spark.driver.extraJavaOptions="-Xss10m -XX:MaxPermSize=512M " --conf spark.executor.extraJavaOptions="-Xss10m -XX:MaxPermSize=128M "
Upvotes: 1