learnerer
learnerer

Reputation: 484

Multipart upload error to S3 from Spark

I am getting an error "Upload attempts for part num: 2 have already reached max limit of: 5, will throw exception and fail" when trying to close the Sequence file writer. The full log of exception is below:

16/12/30 19:47:01 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0001 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a
16/12/30 19:47:12 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt1/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0002 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a
16/12/30 19:47:23 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0003 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a
16/12/30 19:47:35 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt1/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0004 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a
16/12/30 19:47:46 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0005 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a
16/12/30 19:47:57 ERROR s3n.MultipartUploadOutputStream: Upload attempts for part num: 2 have already reached max limit of: 5, will throw exception and fail
16/12/30 19:47:57 INFO s3n.MultipartUploadOutputStream: completeMultipartUpload error for key: output/part-20176
java.lang.IllegalStateException: Reached max limit of upload attempts for part
    at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.spawnNewFutureIfNeeded(MultipartUploadOutputStream.java:362)
    at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.uploadMultiParts(MultipartUploadOutputStream.java:422)
    at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.close(MultipartUploadOutputStream.java:471)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
    at org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1290)
   ...
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/12/30 19:47:59 INFO s3n.MultipartUploadOutputStream: uploadPart error com.amazonaws.AbortedException: 
16/12/30 19:48:18 INFO s3n.MultipartUploadOutputStream: uploadPart error com.amazonaws.AbortedException: 

I just get the error that the 5 retries failed. I do not get the cause for that. Has anyone seen this error before? What could be the reason for this?

I am writing the sequence file using my own implementation of multi-output format:

class MultiOutputSequenceFileWriter(prefix: String, suffix: String) extends Serializable {
   private val writers = collection.mutable.Map[String, SequenceFile.Writer]()

   /**
     * @param pathKey    folder within prefix where the content will be written
     * @param valueKey   key of the data to be written
     * @param valueValue value of the data to be written
     */
   def write(pathKey: String, valueKey: Any, valueValue: Any) = {
     if (!writers.contains(pathKey)) {
       val path = new Path(prefix + "/" + pathKey + "/" + "part-" + suffix)
       val hadoopConf = new conf.Configuration()
       hadoopConf.setEnum("io.seqfile.compression.type", SequenceFile.CompressionType.NONE)
       val fs = FileSystem.get(hadoopConf)
       writers(pathKey) = SequenceFile.createWriter(hadoopConf, Writer.file(path),
         Writer.keyClass(valueKey.getClass()),
         Writer.valueClass(valueValue.getClass()),
         Writer.bufferSize(fs.getConf().getInt("io.file.buffer.size", 4096)), //4KB
         Writer.replication(fs.getDefaultReplication()),
         Writer.blockSize(1073741824), // 1GB
         Writer.progressable(null),
         Writer.metadata(new Metadata()))
     }
     writers(pathKey).append(valueKey, valueValue)
   }
   def close = writers.values.foreach(_.close())
}

I am trying to write the sequence file as follows:

...
rdd.mapPartitionsWithIndex { (p, it) => {
  val writer = new MultiOutputSequenceFileWriter("s3://bucket/output/", p.toString)
  for ( (key1, key2, data) <- it) {
    ...
    writer.write(key1, key2, data)
    ...
  }
  writer.close
  Nil.iterator
}.foreach( (x:Nothing) => ()) // To trigger iterator
}
...

Note:

Upvotes: 3

Views: 12975

Answers (2)

learnerer
learnerer

Reputation: 484

AWS support engineer mentioned that at the time of the error, there were lots of hits on the bucket. The job was retrying the default number of times (5) and most probably all the retries were throttled. Now, I have increased the number of retries with the following config param added while submitting job.

spark.hadoop.fs.s3.maxRetries=20

Additionally, I have compressed the output so that the number of requests to S3 would be reduced. I have not seen failures for several runs after these changes.

Upvotes: 8

stevel
stevel

Reputation: 13430

the writer (Amazon code BTW, nothing the spark or hadoop teams will handle) writes data up in blocks as it is generated (in background threads), with the remaining data and the mulitpart upload committed in the close() —which is also where the code will block waiting all pending uploads to complete.

Sounds like some of the PUT requests have been failing and it is in the close() call where this failure is picked up and reported. I don't know if EMR s3:// client uses that blocksize as a size marker for its partitions; I'd personally recommend a smaller size there like 128MB.

Anyway: assume transient network problems, or the EC2 VM you got allocated has bad network connectivy. Ask for a new VM.

Upvotes: 0

Related Questions