Reputation: 528
I'm having issues with PySpark writing a dataframe to Hadoop as partitioned parquet files.
This works well:
salesDfSpark.write.option("header",True) \
.partitionBy("Country") \
.mode("overwrite") \
.csv("hdfs://master:9000/sales/{}_{}.csv".format(csvName,epochNow)) #Hadoop Namenode at port 9000
print("Sales Dataframe stored in Hadoop.")
This doesn't work:
salesDfSpark.write.option("header",True) \
.partitionBy("Country") \
.mode("overwrite") \
.parquet("hdfs://master:9000/sales/{}_{}.parquet".format(csvName,epochNow)) #Hadoop Namenode at port 9000
print("Sales Dataframe stored in Hadoop.")
error using openjdk8:
Caused by: java.lang.IllegalArgumentException
at java.nio.Buffer.limit(Buffer.java:275)
at org.xerial.snappy.Snappy.compress(Snappy.java:156)
error using openjdk11:
Caused by: java.lang.IllegalArgumentException: newLimit > capacity: (80 > 76)
at java.base/java.nio.Buffer.createLimitException(Buffer.java:372)
Apart from changing java version I've tested both snappy-java-1.1.8.4 and snappy-java-1.1.4 with same result. Anyone having any experience with this issue?
Edit:
This gives me the same error:
salesDfSpark.write.saveAsTable('salesDfSpark')
Output:
2022-06-12 10:35:29,468 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 7.0 (TID 1184) (worker2 executor 0): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:500)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:321)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: newLimit > capacity: (144 > 112)
at java.base/java.nio.Buffer.createLimitException(Buffer.java:372)
at java.base/java.nio.Buffer.limit(Buffer.java:346)
at java.base/java.nio.ByteBuffer.limit(ByteBuffer.java:1107)
at java.base/java.nio.MappedByteBuffer.limit(MappedByteBuffer.java:235)
at java.base/java.nio.MappedByteBuffer.limit(MappedByteBuffer.java:67)
at org.xerial.snappy.Snappy.compress(Snappy.java:156)
at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:78)
at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:167)
at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:168)
at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:59)
at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:387)
at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:186)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:29)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:185)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:164)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:41)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseCurrentWriter(FileFormatDataWriter.scala:64)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:75)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:105)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:305)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:311)
... 9 more
Upvotes: 2
Views: 3318
Reputation: 116
The following sequence worked for me:
Upvotes: 1
Reputation: 101
I also had same error. Solution and error message too irrelevant, but following worked for me.
In my case java temp directory "/tmp" mounted to the server with noexec flag. snappy-java library could not able to extract and execute its native library in temp directory and throws the mentioned stack trace because of the temp directory rights.
Solution is providing another directory which has execution right by using "-Dorg.xerial.snappy.tempdir" configuration property.
spark-submit --conf "spark.driver.extraJavaOptions=-Dorg.xerial.snappy.tempdir=/my_user/temp_folder" my_app.py
Noexec flag also can be checked by using following command:
findmnt -l
Upvotes: 4