Mark Ballenger
Mark Ballenger

Reputation: 11

DataProc - SparkException: Failed to register classes with Kryo - java.lang.ClassNotFoundException

I am receiving ClassNotFoundExceptions when trying to use KryoSerializer for a dataframe on DataProc for a class that is part of the main JAR of sent to spark-submit. This only happens when using spark:spark.submit.deployMode "cluster" and a master setting of "yarn". It does not happen if I just a master setting like "local[*]" which lends me to believe that the JAR class path of the executors is missing or something.

I've tried experimenting with the following by adding the MAIN_JAR to classpath, but keep coming up short to the same ClassNotFoundException conclusion. Everything I read suggests this shouldn't be necessary however.

"spark:spark.executor.extraClassPath"
"spark:spark.driver.extraClassPath"
"spark:spark.driver.userClassPathFirst"
"spark:spark.executor.userClassPathFirst"
"spark:spark.yarn.dist.jars"
"spark:spark.yarn.jars"
"spark:spark.jars"

I'm using the latest 2.1 DataProc image. I'm getting this very frustrating "ClassNotFoundException" for trying to Kyro serialize my dataframe. The thing is the class exists in the main jar submitted on submit-spark.

Here is the Java code for the main jar that sends spark-submit, the JAR is on GCS and using the GCS Cloud Storage Connector, but I know this works since this is only happening in cluster mode:

SparkJob sparkJob = SparkJob.newBuilder().setMainJarFileUri(MAIN_JAR)

The specific image URI of the DataProc image i'm using. This is the explicit image URI, but i've also tried some of the other existing 2.1 images as well like debian.

https://www.googleapis.com/compute/v1/projects/cloud-dataproc/global/images/dataproc-2-1-ubu20-20221201-035100-rc01

Here's the gcloud image list command for 2.1 I'm using to get the image URI of the stable 2.1 images available for reference:

gcloud compute images list --uri --project cloud-dataproc --filter "labels.goog-dataproc-version ~ ^2.1.0" --sort-by=~creationTimestamp

Here's some of the stacktrace:

22/12/21 03:57:18 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (<GCP project resource details redacted> executor 1): java.io.IOException: org.apache.spark.SparkException: Failed to register classes with Kryo
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1477)
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:228)
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:105)
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:84)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Failed to register classes with Kryo
    at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:183)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:233)
    at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:171)
    at org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:102)
    at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
    at org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:109)
    at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:346)
    at org.apache.spark.serializer.KryoDeserializationStream.<init>(KryoSerializer.scala:302)
    at org.apache.spark.serializer.KryoSerializerInstance.deserializeStream(KryoSerializer.scala:436)
    at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:336)
    at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:259)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:233)
    at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:228)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1470)
    ... 11 more
Caused by: java.lang.ClassNotFoundException: <<redacted>.MyClass In the Main JAR sent to spark-submit>
    at java.base/java.lang.ClassLoader.findClass(ClassLoader.java:719)
    at org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.java:35)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
    at org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.java:40)
    at org.apache.spark.util.ChildFirstURLClassLoader.loadClass(ChildFirstURLClassLoader.java:48)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Class.java:398)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:220)
    at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:174)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:173)
    ... 27 more

Also to make note, this is my pom.xml. I use provided for all the libraries already installed on the cluster:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.3.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.3.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.google.cloud.spark</groupId>
            <artifactId>spark-bigquery-with-dependencies_2.12</artifactId>
            <version>0.27.1</version>
            <scope>provided</scope>
        </dependency>

I also tried experimenting with initialization actions to copy the JAR with gsutil to the class path of executors manually, but that seems unnecessary and yields the same result anyway.

Upvotes: 1

Views: 393

Answers (0)

Related Questions