Reputation: 2517
I wrote some code to read a parquet file, switch the schema slightly and write the data to a new parquet file. The code looks as follows:
...
val schema = StructType(
List(
StructField("id", LongType, false),
StructField("data", ArrayType(FloatType), false)
)
)
val data = sqlContext.read.parquet(file.getAbsolutePath)
val revisedData = data.map(r => Row(r.getInt(0).toLong, r.getSeq[Float](1)))
val df = sqlContext.createDataFrame(revisedData, schema)
Writer.writeToParquet(df)
with Writer
being
object Writer {
def writeToParquet(df : DataFrame) : Unit = {
val future = Future {
df.write.mode(SaveMode.Append).save(path)
}
Await.ready(future, Duration.Inf)
}
}
For a file of about 4 GB my program breaks, raising an OutOfMemoryError: Java heap space. I have set 6 GB of memory to the executor (using -Dspark.executor.memory=6g
), raised the JVM heap space (using -Xmx6g
), increased the Kryo serializer buffer to 2 GB (using System.setProperty("spark.kryoserializer.buffer.mb", "2048")
). However, I still get the error.
This is the stack trace:
java.lang.OutOfMemoryError: Java heap space
at com.esotericsoftware.kryo.io.Output.<init>(Output.java:35)
at org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:76)
at org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:243)
at org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:243)
at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:247)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)
What can I do to avoid this error?
Upvotes: 3
Views: 5764
Reputation: 3641
Using sparklyr, having the same OutOfMemoryError, despite reducing spark.kryoserializer.buffer, not beeing able to read a parquet a file I had been able to write, my solution was to:
disable the "eager" memory load option: (memory=FALSE)
spark_read_parquet(sc,name=curName,file.path("file://",srcFile), header=true, memory=FALSE)
spark 2.3.0 sparklyr 1.0.0 R version 3.4.2
Upvotes: 0
Reputation: 15141
Following my comment, two things:
1) You need to watch out with the spark.kryoserializer.buffer.mb
property name, in the newest spark they changed it to spark.kryoserializer.buffer
and spark.kryoserializer.buffer.max
.
2) You have to be careful with the size of the buffer and your heap size, it has to be big enough to store a single record you are writing but not much more as kryo is creating an explicit byte[]
of that size (and allocating a single byte
array for 2GB is usually a bad idea). Try lowering your buffer size with the proper property.
Upvotes: 4