Brett Larson
Brett Larson

Reputation: 73

Unable to get S3A Directory Committers to write files in Spark 3.0.0

We are using Spark 3.0.0 and we are trying to write to S3a using the new S3A committers that Ryan Blue at Netflix wrote and were added in Spark by steveloughran.

We are using the build without Hadoop (spark-3.0.0-bin-without-hadoop) and provide our own Hadoop Jars (Hadoop 3.2.1).

The original issue I was facing was that we were getting a class not found exception for org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

Full trace below:

py4j.protocol.Py4JJavaError: An error occurred while calling o191.parquet.
: java.lang.ClassNotFoundException: org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:206)
    at org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:154)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:113)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:380)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:829)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Looking this up, I found another Stackoverflow answer which describes the need to add the spark-hadoop-cloud_2.11 dependencies to get this to work as well as describes the fact that these should be bundled going forward in Spark 3.0.0.

I added the spark-hadoop-cloud_2.11 jar from Hortonworks which I found in their repo:

This seems to work - as I no longer get the class not found error and I am able to successfully write a _SUCCESS file with JSON output that I would except, however I am not actually able to write the files to our S3 backend. (pretty important).

In the logs, it shows:

WARN commit.AbstractS3ACommitter: Task committer attempt_20200630210336_0000_m_000000_0: No pending uploads to commit

but seems to think it's successful.

From the _SUCCESS file I am able to see:

{
  "name" : "org.apache.hadoop.fs.s3a.commit.files.SuccessData/1",
  "timestamp" : 1593551030264,
  "date" : "Tue Jun 30 21:03:50 UTC 2020",
  "hostname" : "dcc0b3ce1f04803497e3da46e34ec91b",
  "committer" : "directory",
  "description" : "Task committer attempt_20200630210336_0000_m_000000_0",
  "metrics" : {
    "stream_write_block_uploads" : 0,
    "files_created" : 0,
    "S3guard_metadatastore_put_path_latencyNumOps" : 0,
    "stream_write_block_uploads_aborted" : 0,
    "committer_commits_reverted" : 0,
    "op_open" : 0,
    "stream_closed" : 0,
    "committer_magic_files_created" : 0,
    "object_copy_requests" : 0,
    "s3guard_metadatastore_initialization" : 0,
    "S3guard_metadatastore_put_path_latency90thPercentileLatency" : 0,
    "stream_write_block_uploads_committed" : 0,
    "S3guard_metadatastore_throttle_rate75thPercentileFrequency (Hz)" : 0,
    "S3guard_metadatastore_throttle_rate90thPercentileFrequency (Hz)" : 0,
    "committer_bytes_committed" : 0,
    "op_create" : 0,
    "stream_read_fully_operations" : 0,
    "committer_commits_completed" : 0,
    "object_put_requests_active" : 0,
    "s3guard_metadatastore_retry" : 0,
    "stream_write_block_uploads_active" : 0,
    "stream_opened" : 0,
    "S3guard_metadatastore_throttle_rate95thPercentileFrequency (Hz)" : 0,
    "op_create_non_recursive" : 0,
    "object_continue_list_requests" : 0,
    "committer_jobs_completed" : 1,
    "S3guard_metadatastore_put_path_latency50thPercentileLatency" : 0,
    "stream_close_operations" : 0,
    "stream_read_operations" : 0,
    "object_delete_requests" : 0,
    "fake_directories_deleted" : 0,
    "stream_aborted" : 0,
    "op_rename" : 0,
    "object_multipart_aborted" : 0,
    "committer_commits_created" : 0,
    "op_get_file_status" : 2,
    "s3guard_metadatastore_put_path_request" : 0,
    "committer_commits_failed" : 0,
    "stream_bytes_read_in_close" : 0,
    "op_glob_status" : 0,
    "stream_read_exceptions" : 0,
    "op_exists" : 2,
    "stream_read_version_mismatches" : 0,
    "S3guard_metadatastore_throttle_rate50thPercentileFrequency (Hz)" : 0,
    "S3guard_metadatastore_put_path_latency95thPercentileLatency" : 0,
    "stream_write_block_uploads_pending" : 0,
    "directories_created" : 0,
    "S3guard_metadatastore_throttle_rateNumEvents" : 0,
    "S3guard_metadatastore_put_path_latency99thPercentileLatency" : 0,
    "stream_bytes_backwards_on_seek" : 0,
    "stream_bytes_read" : 0,
    "stream_write_total_data" : 0,
    "committer_jobs_failed" : 0,
    "stream_read_operations_incomplete" : 0,
    "files_copied_bytes" : 0,
    "op_delete" : 0,
    "object_put_bytes_pending" : 0,
    "stream_write_block_uploads_data_pending" : 0,
    "op_list_located_status" : 0,
    "object_list_requests" : 2,
    "stream_forward_seek_operations" : 0,
    "committer_tasks_completed" : 0,
    "committer_commits_aborted" : 0,
    "object_metadata_requests" : 4,
    "object_put_requests_completed" : 0,
    "stream_seek_operations" : 0,
    "op_list_status" : 0,
    "store_io_throttled" : 0,
    "stream_write_failures" : 0,
    "op_get_file_checksum" : 0,
    "files_copied" : 0,
    "ignored_errors" : 0,
    "committer_bytes_uploaded" : 0,
    "committer_tasks_failed" : 0,
    "stream_bytes_skipped_on_seek" : 0,
    "op_list_files" : 0,
    "files_deleted" : 0,
    "stream_bytes_discarded_in_abort" : 0,
    "op_mkdirs" : 0,
    "op_copy_from_local_file" : 0,
    "op_is_directory" : 0,
    "s3guard_metadatastore_throttled" : 0,
    "S3guard_metadatastore_put_path_latency75thPercentileLatency" : 0,
    "stream_write_total_time" : 0,
    "stream_backward_seek_operations" : 0,
    "object_put_requests" : 0,
    "object_put_bytes" : 0,
    "directories_deleted" : 0,
    "op_is_file" : 0,
    "S3guard_metadatastore_throttle_rate99thPercentileFrequency (Hz)" : 0
  },
  "diagnostics" : {
    "fs.s3a.metadatastore.impl" : "org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore",
    "fs.s3a.committer.magic.enabled" : "false",
    "fs.s3a.metadatastore.authoritative" : "false"
  },
  "filenames" : [ ]
}

This is where I'm stumped.

At this point I don't know if this is a problem with how i'm writing or if it's a problem with the random JAR I added to get this class working.

I don't have a good feeling about adding this random JAR from Hortonworks, but I am not able to see any proper Spark JARs from the [MavenCentral repo].(https://mvnrepository.com/artifact/org.apache.spark/spark-hadoop-cloud_2.11)

From the integration with cloud in the Spark docs it looks like I should be able to specify a hadoop-cloud_2.12 dependency for Spark 3.0.0 but I do not see it anywhere online.

Appreciate the help.

Upvotes: 3

Views: 3059

Answers (1)

stevel
stevel

Reputation: 13430

This surfaces when you have > 1 machine in the spark cluster but you aren't using a shared filesystem to propagate the data about pending commits into the final dir.

make sure that fs.s3a.committer.staging.tmp.path points to something in HDFS, not paths local to the machines

Not using HDFS? well, you'd better make sure s3guard is on (for consistent s3 listings), then I'd switch to the magic committer which is pure S3 -no need for any cluster FS. Do not attempt to use it without S3Guard unless you like invalid answers

w.r.t why no spark-hadoop-cloud artifact? didn't get built in the release. The fact it adds the entire AWS SDK to the download is probably a factor. You can build it yourself though -it is probably safer to do that than mix spark artifacts

Upvotes: 1

Related Questions