BigTobster
BigTobster

Reputation: 849

Pyspark java.lang.OutOfMemoryError: Requested array size exceeds VM limit

I am running a Pyspark job:

spark-submit --master yarn-client --driver-memory 150G --num-executors 8 --executor-cores 4 --executor-memory 150G benchmark_script_1.py hdfs:///tmp/data/sample150k 128 hdfs:///tmp/output/sample150k | tee ~/output/sample150k.log

The job itself is pretty standard. It just grabs some files and counts them.:

print(str(datetime.now()) + " - Ingesting files...")
files = sc.wholeTextFiles(inputFileDir, partitions)
fileCount = files.count()
print(str(datetime.now()) + " - " + str(fileCount) + " files ingested")

The source folder contains ~150'000 files. It's about 35G without replication and 105G with replication. Fairly heavy but not insane.

Running the above gives the following stacktrace:

15/08/11 15:39:20 WARN TaskSetManager: Lost task 61.3 in stage 0.0 (TID 76, <NODE>): java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
        at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
        at java.io.DataInputStream.read(DataInputStream.java:100)
        at org.spark-project.guava.io.ByteStreams.copy(ByteStreams.java:207)
        at org.spark-project.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
        at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:83)
        at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)

More information can be found in the offending executor logs:

15/08/11 12:28:18 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
15/08/11 12:28:18 ERROR util.Utils: Uncaught exception in thread stdout writer for python
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
        at java.lang.StringCoding.encode(StringCoding.java:350)
        at java.lang.String.getBytes(String.java:939)
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:573)
        at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:395)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:405)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/daemon.py", line 162, in manager
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/daemon.py", line 60, in worker
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/worker.py", line 126, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/serializers.py", line 528, in read_int
15/08/11 12:28:18 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for python,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
        at java.lang.StringCoding.encode(StringCoding.java:350)
        at java.lang.String.getBytes(String.java:939)
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:573)
        at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:395)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:405)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/daemon.py", line 162, in manager
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/daemon.py", line 60, in worker
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/worker.py", line 126, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/serializers.py", line 528, in read_int
15/08/11 12:28:18 ERROR executor.Executor: Exception in task 7.0 in stage 0.0 (TID 5)
java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
        at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
        at java.io.DataInputStream.read(DataInputStream.java:100)
        at org.spark-project.guava.io.ByteStreams.copy(ByteStreams.java:207)
        at org.spark-project.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
        at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:83)
        at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
    raise EOFError
EOFError

I have disabled the HDFS cache:

conf.set("fs.hdfs.impl.disable.cache", True)

Note that the exact same script in Scala does not have any issues at all.

Although this is a large job, there is loads of memory available for it. Anyone know what the issue could be?

UPDATE

Allocated more memory to JVM.

export set JAVA_OPTS="-Xmx6G -XX:MaxPermSize=2G -XX:+UseCompressedOops"

Sadly, no improvement.

Upvotes: 3

Views: 5245

Answers (1)

Thomas Taylor
Thomas Taylor

Reputation: 1357

I'm having a similar problem with spark-submit and Java, saving an 8GB DataFrame. Docker container with 16-core, 300GB RAM. I haven't resolved the problem yet, but I've come across a couple possible workarounds:

Starting on page 77, Lightbend suggests this is a problem with the shell, using @transient or encapsulating in an object may be a workaround. This doesn't seem to apply in either of our cases.

DataBricks suggests that increasing the spark.sql.shuffle.partitions may help. They suggest changing from the default '200' to '400'. I've tried '800' and '2000' in spark-defaults.conf but still get the OOM Error.

DataBricks also suggests calling DataFrame.repartition(400) in the code. Or, increase the number of partitions as the last parameter to your call to sc.wholeTextFiles(inputFileDir, partitions)

The JAVA_OPTS suggestions from StackOverflow wouldn't apply as -XX:+UseCompressedOops is disabled (in Java 8) if heap size is greater than 32GB

EDIT

Also tried:

  • spark.default.parallelism=1000 (default is number of cores). Still OOM Error.
  • dataFrame.repartition(1000) in the code. Still OOM Error.

Possible Workarounds

  • Using intermediate RDD<LabeledPoint> allowed me to create the DataFrame, but the Spark reflection schema doesn't work with MLLib (missing numClasses attribute).

    DataFrame df = sqlContext.createDataFrame(sc.parallelize(List<LabeledPoint>),LabeledPoint.class)

  • Using intermediate JSON file allowed me to create the DataFrame to work with MLLib.

    saveAsJson(List<Row>/*generated data*/, filename); DataFrame df = sqlContext.read().json(filename)

Upvotes: 3

Related Questions