Reputation: 1693
I am trying to get started with Spark using Scala.
Originally, I was trying to write a streaming Kinesis consumer, following along with this official example. Although, at this point I have reduced my error case to remove anything Kinesis related, except for the package dependency, and the error remains the same.
I used SBT to generate an assembly JAR of my project. Then, I tried to run it locally using spark-submit
. (Detailed steps below.)
This consistently fails with a ClassNotFoundException
, claiming that it cannot find the main class of my application. (Detailed output below.)
I should emphasize:
To the extent of my understanding, I don't believe this is the same ClassNotFoundException
that other posters have seen, and I don't believe this question is a duplicate of those questions.
In particular, as far as I can tell:
sbt assembly
./bin/spark-submit --class "sparkpoc.KinesisExample" --master local[4] ~/git/ming-spark-poc/target/scala-2.11/ming-spark-poc-assembly-0.1.jar
(With appropriate substitutions.)$ ./bin/spark-submit --class "sparkpoc.KinesisExample" --master local[4] ~/git/ming-spark-poc/target/scala-2.11/ming-spark-poc-assembly-0.1.jar
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/Users/ming/misc/spark-2.3.0-bin-hadoop2.7/jars/hadoop-auth-2.7.3.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2018-05-09 15:39:01 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
java.lang.ClassNotFoundException: sparkpoc.KinesisExample
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:466)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:566)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:499)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:374)
at org.apache.spark.util.Utils$.classForName(Utils.scala:235)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:836)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2018-05-09 15:39:01 INFO ShutdownHookManager:54 - Shutdown hook called
2018-05-09 15:39:01 INFO ShutdownHookManager:54 - Deleting directory /private/var/folders/py/jrf50pwj1xdd4grjvlg07g580000gp/T/spark-c5f3bade-fbfe-4516-900e-99fee1b47366
build.sbt
name := "ming-spark-poc"
version := "0.1"
scalaVersion := "2.11.8"
val sparkVersion = "2.3.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming-kinesis-asl" % sparkVersion
)
assemblyOption in assembly := (assemblyOption in assembly).value
.copy(includeScala = false)
assemblyMergeStrategy in assembly := {
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case _ => MergeStrategy.first
}
I'm aware that setting assemblyMergeStrategy
without a default case fall back is bad practice in production code. This was just a quick hack to get the project to build, and as far as I know, it does not relate to my current error.
assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")
KinesisExample.scala
Originally, this was a Kinesis consumer. It's been reduced to a placeholder app that does nothing. The error has not changed.
package sparkpoc
import scala.collection.mutable
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KinesisExample {
def main(args: Array[String]): Unit = {
val batchInterval = Seconds(5)
val sparkConf = new SparkConf().setAppName("SparcPocKinesisExample")
val streamingContext = new StreamingContext(sparkConf, batchInterval)
streamingContext.start()
streamingContext.awaitTermination()
}
}
I can run the official examples from the prepackaged JAR with seemingly no issues.
$ ./bin/run-example SparkPi 10
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/Users/ming/misc/spark-2.3.0-bin-hadoop2.7/jars/hadoop-auth-2.7.3.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2018-05-09 16:14:07 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-05-09 16:14:08 INFO SparkContext:54 - Running Spark version 2.3.0
2018-05-09 16:14:08 INFO SparkContext:54 - Submitted application: Spark Pi
<SNIPPED>
As far as I know, the produced JAR file contains the expected class file. I verified this independently in 2 different ways.
I examined the contents of the JAR with jar -tf
. It contains the expected class file in the expected location, as far as I can tell.
$ jar -tf ./target/scala-2.11/ming-spark-poc-assembly-0.1.jar | grep KinesisExample
org/apache/spark/examples/streaming/KinesisExampleUtils$$anonfun$getRegionNameByEndpoint$1.class
org/apache/spark/examples/streaming/KinesisExampleUtils$$anonfun$getRegionNameByEndpoint$2.class
org/apache/spark/examples/streaming/KinesisExampleUtils$$anonfun$getRegionNameByEndpoint$3.class
org/apache/spark/examples/streaming/KinesisExampleUtils$.class
org/apache/spark/examples/streaming/KinesisExampleUtils.class
sparkpoc/KinesisExample$.class
sparkpoc/KinesisExample.class
I extracted the contents of the JAR with unzip
and examined them manually. It contains the expected class file in the expected location, as far as I can tell.
While I do not expect anything in this project is current working directory dependent, I repeated the same steps using the Spark installation root as the current working directory, with no change in outcome.
I tried running the generated JAR directly. While I don't expect this to work correctly to actually run the Spark application, I thought it may provide insight into what is going on with class resolution. It fails as follows.
$ java -jar ./target/scala-2.11/ming-spark-poc-assembly-0.1.jar "sparkpoc.KinesisExample"
Error: Could not find or load main class sparkpoc.KinesisExample
Caused by: java.lang.ClassNotFoundException: sparkpoc.KinesisExample
I tried renaming the package the class is in, including at one point putting it at the toplevel package (no package
declaration). Each time, invoking spark-submit
with the appropriate fully-qualified class name, I still got the same error.
In case the assemblyMergeStrategy
hack was breaking something indirectly, I tried replacing it with an explicit listing as follows.
assemblyMergeStrategy in assembly := {
case PathList("javax", "inject", _*) => MergeStrategy.last
case PathList("org", "apache", _*) => MergeStrategy.last
case PathList("org", "aopalliance", _*) => MergeStrategy.last
case PathList("mime.types") => MergeStrategy.last
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
I still got the same error. EDIT: This actually works as expected. I had a separate issue with a stale build artifact. See below.
assemblyMergeStrategy
.)Thank you in advance for any insights or advice you may have.
The answer by Ahmad Ragab below is correct.
There was indeed something wrong with my assemblyMergeStrategy
, leading to a malformed output JAR. To the limited extent of my understanding, I believe I was clobbering some important metadata with the overly aggressive wildcard in assemblyMergeStrategy
. The version that works is as follows.
assemblyMergeStrategy in assembly := {
case PathList("javax", "inject", _*) => MergeStrategy.last
case PathList("org", "apache", _*) => MergeStrategy.last
case PathList("org", "aopalliance", _*) => MergeStrategy.last
case PathList("mime.types") => MergeStrategy.last
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
Notably, I tried this once before, and it somehow didn't work. My suspicion, although I cannot prove it retroactively, is that I was accidentally testing that change with a stale build artifact, and so I was accidentally running an older version without those changes. After cleaning everything and rebuilding with those new changes, it worked as expected.
(Note that the app will still crash on startup due to there being no defined outputs, but of course, that failure is an expected one, and we have removed the unexpected one.)
Upvotes: 5
Views: 1641
Reputation: 1117
Something looks potentially strange about that assemblyMergeStrategy
try:
assemblyMergeStrategy in assembly := {
case PathList("META-INF", _@_*) => MergeStrategy.discard
case _ => MergeStrategy.first
}
Second, you may in the assembly options need to explicitly set your main class so that the proper manifest gets created, though to be fair, I can't prove this. In the past I have done the following with some success
mainClass in assembly := Some("sparkpoc.KinesisExample")
Finally, one way you might confirm that your jar is properly created is executing this:
java -classpath ming-spark-poc-assembly-0.1.jar "sparkpoc.KinesisExample"
Hopefully, some of this leads you in the right direction.
Upvotes: 4