navige
navige

Reputation: 2517

Spark: Read and Write to Parquet leads to OutOfMemoryError: Java heap space

I wrote some code to read a parquet file, switch the schema slightly and write the data to a new parquet file. The code looks as follows:

...
val schema = StructType(
  List(
    StructField("id", LongType, false),
    StructField("data", ArrayType(FloatType), false)
  )
)

val data = sqlContext.read.parquet(file.getAbsolutePath)
val revisedData = data.map(r =>  Row(r.getInt(0).toLong, r.getSeq[Float](1)))
val df = sqlContext.createDataFrame(revisedData,  schema)

Writer.writeToParquet(df)

with Writer being

object Writer {
    def writeToParquet(df : DataFrame) : Unit = {
       val future = Future {
         df.write.mode(SaveMode.Append).save(path)
       }

       Await.ready(future, Duration.Inf)
    }
}

For a file of about 4 GB my program breaks, raising an OutOfMemoryError: Java heap space. I have set 6 GB of memory to the executor (using -Dspark.executor.memory=6g), raised the JVM heap space (using -Xmx6g), increased the Kryo serializer buffer to 2 GB (using System.setProperty("spark.kryoserializer.buffer.mb", "2048")). However, I still get the error.

This is the stack trace:

java.lang.OutOfMemoryError: Java heap space
  at com.esotericsoftware.kryo.io.Output.<init>(Output.java:35)
  at org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:76)
  at org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:243)
  at org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:243)
  at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:247)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:744)

What can I do to avoid this error?

Upvotes: 3

Views: 5764

Answers (2)

user1767316
user1767316

Reputation: 3641

Using sparklyr, having the same OutOfMemoryError, despite reducing spark.kryoserializer.buffer, not beeing able to read a parquet a file I had been able to write, my solution was to:

disable the "eager" memory load option: (memory=FALSE)

spark_read_parquet(sc,name=curName,file.path("file://",srcFile), header=true, memory=FALSE)

spark 2.3.0 sparklyr 1.0.0 R version 3.4.2

Upvotes: 0

Mateusz Dymczyk
Mateusz Dymczyk

Reputation: 15141

Following my comment, two things:

1) You need to watch out with the spark.kryoserializer.buffer.mb property name, in the newest spark they changed it to spark.kryoserializer.buffer and spark.kryoserializer.buffer.max.

2) You have to be careful with the size of the buffer and your heap size, it has to be big enough to store a single record you are writing but not much more as kryo is creating an explicit byte[] of that size (and allocating a single byte array for 2GB is usually a bad idea). Try lowering your buffer size with the proper property.

Upvotes: 4

Related Questions