mwlon
mwlon

Reputation: 1006

Spark S3A write omits upload part without failure

I'm using Spark 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5 to write Datasets to parquet files on S3A. Occasionally a file part will be missing; i.e. part 00003 here:

> aws s3 ls my-bucket/folder/
2019-02-28 13:07:21          0 _SUCCESS
2019-02-28 13:06:58   79428651 part-00000-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:06:59   79586172 part-00001-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:00   79561910 part-00002-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:01   79192617 part-00004-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:07   79364413 part-00005-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:08   79623254 part-00006-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:10   79445030 part-00007-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:10   79474923 part-00008-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:11   79477310 part-00009-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:12   79331453 part-00010-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:13   79567600 part-00011-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:13   79388012 part-00012-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:14   79308387 part-00013-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:15   79455483 part-00014-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:17   79512342 part-00015-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:18   79403307 part-00016-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:18   79617769 part-00017-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:19   79333534 part-00018-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:20   79543324 part-00019-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet

What concerns me the most is that the Spark application SUCCEEDS.

2019-02-28 21:05:39 INFO  AmazonHttpClient:448 - Unable to execute HTTP request: Read timed out
java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:161)
    at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:82)
    at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:278)
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
    at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)
    at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:286)
    at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:257)
    at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:207)
    at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
    at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66)
    at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
    at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:684)
    at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:486)
    at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:835)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:384)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3480)
    at com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:604)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:960)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:1144)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:1133)
    at org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:142)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
    at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:685)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)
    at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:244)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
...

(this stack trace repeated 6 times)

I am tweaking the Hadoop S3A configurations to see if this can happen less frequently, but what I really want is for the application to FAIL if this happens. As it is, downstream applications start, expecting data to be present, and produce incorrect results due to the missing data.

How can I change Spark/Hadoop's behavior in this case?

Upvotes: 0

Views: 442

Answers (2)

stevel
stevel

Reputation: 13430

that's called "side effects of an inconsistent filesystem with a job committer which depends on consistent directory listings to rename work into place"

Fixes

  • Use a consistency layer; for S3A that's S3Guard
  • Use an alternate committer: For ASF Spark and Hadoop 3.1, that's the "zero rename committer"
  • Radical but best in long term: Use a different layout for data, with Apache Iceberg being the one I have in mind

Update: This is not true in this specific instance, as Ceph is the FS and it is consistent.

Upvotes: 0

mwlon
mwlon

Reputation: 1006

It appears to be impossible to get around this issue (at least in Hadoop 2.7), so for now I've added an assertion after each Spark S3 write ensuring that the number of file parts matches the number of partitions in the Dataset's RDD:

  def overwriteParquetS3(
    ds: Dataset[_],
    bucket: String,
    folder: String
  ): Unit = {
    val numPartitions = ds.rdd.getNumPartitions
    val destination = GeneralUtils.joinPaths("s3a://", bucket, folder)

    ds
        .write
        .mode(SaveMode.Overwrite)
        .parquet(destination)

    val fs = FileSystem.get(
      URI.create(s"s3a://$bucket/"),
      ds.sparkSession.sparkContext.hadoopConfiguration
    )
    val writtenFiles = fs.listFiles(new Path(destination), false)
    val parts = new ArrayBuffer[LocatedFileStatus]()
    while (writtenFiles.hasNext) {
      val next = writtenFiles.next()
      val name = next.getPath.getName
      if (name.startsWith("part-") && name.endsWith(".parquet")) {
        parts += next
      }
    }

    val filePartStr = parts
        .sortBy(_.getPath.getName)
        .map((fileStatus) => s"${fileStatus.getModificationTime} ${fileStatus.getBlockSize} ${fileStatus.getPath.getName}")
        .mkString("\n\t")
    assert(
      parts.length == numPartitions,
      s"Expected to write dataframe with $numPartitions partitions in $destination but instead " +
          s"found ${parts.length} written parts!\n\t$filePartStr"
    )

    println(s"Confirmed that there numPartitions $numPartitions = ${parts.length} written parts")
  }

This seems to be catching all the cases where the write should error but doesn't.

Upvotes: 1

Related Questions