Reputation: 287
I am trying to use S3A Partitioned(or Directory as I just need to confirm if committer is working as expected) committer with Spark. I am following this link based on which it should be pretty simple however I am running into new issues while resolving previous one
Code used for testing is (inside spark-shell
):
val sourceDF = spark.range(0, 10000)
val datasets = "s3a://bucket-name/test"
sourceDF.write.format("orc").save(datasets + "orc")
spark-defaults.conf
is:
spark.hadoop.fs.s3a.committer.name directory
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
Error 1:
scala> sourceDF.write.format("orc").save(datasets + "orc")
java.lang.NoClassDefFoundError:
org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:230)
at org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:144)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:98)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org .apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 81 more
Then I copied spark-hadoop-cloud_2.11-2.3.1.3.0.2.0-50.jar from this link into spark/jars folder
This resolved the previous ``NoClassDefFoundError` but produced new class def error which is:
Error 2:
java.lang.NoClassDefFoundError:
org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:230)
at org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:144)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:98)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
....
Full stacktrace can be pasted if needed
After this, I copied hadoop-mapreduce-client-core-3.1.1.jar into spark/jars folder and ran the test code in spark-shell again. This time I got below error:
After this, I am stuck.
Error 3 (and final error where I am stuck):
scala> sourceDF.write.format("orc").save(datasets + "orc")
java.lang.NoSuchMethodError:
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.<init>(Ljava/lang/String;Ljava/lang/String;Z)V
at org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.<init>(PathOutputCommitProtocol.scala:60)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:98)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
... 48 elided
This looks like incorrect jar issue but I am not able to find the correct one. This question is similar to previous question but could not find relevant answer hence posting again.
Upvotes: 5
Views: 3756
Reputation: 1061
All the new committers config documentation I've read up to date, is missing one fundamental fact:
They promise those cloud integration libs will be bundled with spark 3.0.0, but for now you have to add libraries yourself.
Under the cloud integration maven repos there are multiple distributions supporting the committers, I found one working with the directory committer
Please also remember that the directory committer requires shared filesystem such as HDFS or NFS (we use AWS EFS) to coordinate spark worker writes to S3.
If it is not set properly, the resulting S3 write location will contain empty folder with single _SUCCESS file in it.
If you set your committer properly, the _SUCCESS file will contain the JSON status of the write. If it is empty (zero length), means you still use the default spark s3 committer.
Upvotes: 2
Reputation: 287
I was able to make this work. Issue was with my spark version. I was using spark 2.2.1 version whereas atleast spark 2.3.1 is needed. Last error mentioned in the question was pointing to wrong constructor. After digging a little bit, I found out that spark-core_2.11-2.2.1.jar was using two parameter constructor where as spark-hadoop-cloud_2.11-2.3.1.3.0.2.0-50.jar was expecting 3 parameter constructor which came only in spark-core_2.11-2.3.1.jar version. After version bump and with few tweaks, I was able to test this.
Run this command to see the issue:
javap -classpath spark-core_2.11-2.2.1.jar org/apache/spark/internal/io/HadoopMapReduceCommitProtocol
javap -classpath spark-core_2.11-2.3.1.jar org/apache/spark/internal/io/HadoopMapReduceCommitProtocol
Just to let you know, I downloaded spark 2.3.1 without pre-built hadoop version, then configured it with hadoop 3.1.0 jars and was able to make it work.
Upvotes: 0