Reputation: 1898
After repartitioning a DataFrame in Spark 1.3.0 I get a .parquet exception when saving to Amazon's S3.
logsForDate
.repartition(10)
.saveAsParquetFile(destination) // <-- Exception here
The exception I receive is:
java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: COLUMN
at parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:137)
at parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:129)
at parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:173)
at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:152)
at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:635)
at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:649)
at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:649)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
I would like to know what is the problem and how to solve it.
Upvotes: 14
Views: 7082
Reputation: 26271
Are you sure this is not due to SPARK-6351 ("Wrong FS" upon saving parquet to S3)? If it is, then it has nothing to do with repartitioning, and it has been fixed in spark-1.3.1. If however, like me, you are stuck with spark-1.3.0 because you are using CDH-5.4.0, I just figured out last night a way to get around it directly from the code (no config file change):
spark.hadoopConfiguration.set("fs.defaultFS", "s3n://mybucket")
After that, I could save parquet files to S3 without problem.
Note that there are several drawbacks to this, however. I think (didn't try) that it will then fail to write to another FS than S3 and perhaps also to another bucket. It might also force Spark to write temporary files to S3 rather than locally, but I haven't checked that either.
Upvotes: 1
Reputation: 7021
I can actually reproduce this problem with Spark 1.3.1 on EMR, when saving to S3.
However, saving to HDFS works fine. You could save to HDFS first, and then use e.g. s3distcp to move the files to S3.
Upvotes: 5
Reputation: 6693
I faced with this error when saveAsParquetFile into HDFS. It was because datanode socket write timeout
, therefore I change it to a longer one in Hadoop Settings:
<property>
<name>dfs.datanode.socket.write.timeout</name>
<value>3000000</value>
</property>
<property>
<name>dfs.socket.timeout</name>
<value>3000000</value>
</property>
Hope this helps if you could set S3 like this.
Upvotes: 1