aurelius
aurelius

Reputation: 528

Buffer/Capacity error saving dataframe as parquet in Spark

I'm having issues with PySpark writing a dataframe to Hadoop as partitioned parquet files.

This works well:

salesDfSpark.write.option("header",True) \
        .partitionBy("Country") \
        .mode("overwrite") \
        .csv("hdfs://master:9000/sales/{}_{}.csv".format(csvName,epochNow)) #Hadoop Namenode at port 9000
print("Sales Dataframe stored in Hadoop.")

This doesn't work:

salesDfSpark.write.option("header",True) \
        .partitionBy("Country") \
        .mode("overwrite") \
        .parquet("hdfs://master:9000/sales/{}_{}.parquet".format(csvName,epochNow)) #Hadoop Namenode at port 9000
print("Sales Dataframe stored in Hadoop.")

error using openjdk8:

Caused by: java.lang.IllegalArgumentException
    at java.nio.Buffer.limit(Buffer.java:275)
    at org.xerial.snappy.Snappy.compress(Snappy.java:156)

error using openjdk11:

Caused by: java.lang.IllegalArgumentException: newLimit > capacity: (80 > 76)
    at java.base/java.nio.Buffer.createLimitException(Buffer.java:372)

Apart from changing java version I've tested both snappy-java-1.1.8.4 and snappy-java-1.1.4 with same result. Anyone having any experience with this issue?

Edit:

This gives me the same error:

salesDfSpark.write.saveAsTable('salesDfSpark')

Output:

2022-06-12 10:35:29,468 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 7.0 (TID 1184) (worker2 executor 0): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:500)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:321)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: newLimit > capacity: (144 > 112)
    at java.base/java.nio.Buffer.createLimitException(Buffer.java:372)
    at java.base/java.nio.Buffer.limit(Buffer.java:346)
    at java.base/java.nio.ByteBuffer.limit(ByteBuffer.java:1107)
    at java.base/java.nio.MappedByteBuffer.limit(MappedByteBuffer.java:235)
    at java.base/java.nio.MappedByteBuffer.limit(MappedByteBuffer.java:67)
    at org.xerial.snappy.Snappy.compress(Snappy.java:156)
    at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:78)
    at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
    at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
    at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:167)
    at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:168)
    at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:59)
    at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:387)
    at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:186)
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:29)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:185)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
    at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:164)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:41)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseCurrentWriter(FileFormatDataWriter.scala:64)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:75)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:105)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:305)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:311)
    ... 9 more

Upvotes: 2

Views: 3318

Answers (2)

Govinda Rathi
Govinda Rathi

Reputation: 116

The following sequence worked for me:

  1. I upgraded the versions of the parquet related dependencies and snappy-java dependency. [This made me run into - java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy]
  2. I then added this line in the Dockerfile which builds spark's distribution image - apk add libc6-compat

Upvotes: 1

I also had same error. Solution and error message too irrelevant, but following worked for me.

In my case java temp directory "/tmp" mounted to the server with noexec flag. snappy-java library could not able to extract and execute its native library in temp directory and throws the mentioned stack trace because of the temp directory rights.

Solution is providing another directory which has execution right by using "-Dorg.xerial.snappy.tempdir" configuration property.

spark-submit --conf "spark.driver.extraJavaOptions=-Dorg.xerial.snappy.tempdir=/my_user/temp_folder" my_app.py

Noexec flag also can be checked by using following command:

findmnt -l

Upvotes: 4

Related Questions