Reputation: 155
I am using spark-job on a self-managed cluster (like local environment) while accessing buckets on google storage.
❯ spark-submit --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_292
Branch HEAD
Compiled by user centos on 2021-05-24T04:27:48Z
Revision de351e30a90dd988b133b3d00fa6218bfcaba8b8
Url https://github.com/apache/spark
Type --help for more information.
If I run the job with the following command using locally download gcs-connector
, it finishes successfully.
spark-submit\
--name CreateAllDataDFWithSpark\
--jars ./gcs-connector-hadoop3-2.2.2.jar\
--packages org.apache.spark:spark-avro_2.12:3.1.2\
--conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem\
<path_to>/.cache/pypoetry/virtualenvs/poc-TZFypELR-py3.7/lib/python3.7/site-packages/luigi/contrib/pyspark_runner.py\
/tmp/CreateAllDataDFWithSpark78itslb5/CreateAllDataDFWithSpark.pickle
On the other hand, If I run the job without downloading gcs-connector
beforehand as below,
spark-submit\
--name CreateAllDataDFWithSpark\
--packages org.apache.spark:spark-avro_2.12:3.1.2,com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.2\
--conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem\
--conf spark.hadoop.fs.AbstractFileSystem.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS\
<path_to>/.cache/pypoetry/virtualenvs/poc-TZFypELR-py3.7/lib/python3.7/site-packages/luigi/contrib/pyspark_runner.py\
/tmp/CreateAllDataDFWithSpark1gf54xue/CreateAllDataDFWithSpark.pickle
it gives the following error.
...
py4j.protocol.Py4JJavaError: An error occurred while calling o31.load.
: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:135)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3302)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:377)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
... 24 more
Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;JJ)V
at com.google.cloud.hadoop.gcsio.cooplock.CooperativeLockingOptions$Builder.build(CooperativeLockingOptions.java:58)
at com.google.cloud.hadoop.gcsio.cooplock.CooperativeLockingOptions.<clinit>(CooperativeLockingOptions.java:33)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.<clinit>(GoogleHadoopFileSystemConfiguration.java:383)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.<init>(GoogleHadoopFileSystemBase.java:246)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.<init>(GoogleHadoopFileSystem.java:58)
... 29 more
I do not understand why the second command does not work.
I appreciate any suggestions or comments. Thanks!
Upvotes: 6
Views: 3591
Reputation: 10677
As mentioned in the comments, this stems from a Guava version incompatibility between the GCS connector's dependency vs what you have bundled in your Spark distro. Specifically, the GCS connector hadoop3-2.2.2 depends on Guava 30.1-jre whereas Spark 3.1.2 brings Guava 14.0.1 as a "provided" dependency.
In the two different commands, it was more-or-less luck of the draw that classpath loading happened in the right order for your first approach to work, and it could end up failing unexpectedly again when other jars are added.
Ideally you'll want to host your own jarfile anyways to minimize runtime dependencies on external repositories (Maven repository), so pre-installing the jarfile is the right approach. When you do that, you should consider using the full shaded jarfile (also available on Maven central) instead of the minimal GCS connector jarfile to avoid classloading version issues in the future.
Upvotes: 5