Reputation: 1398
Spark structured streaming throws ClassNotFoundException but class is in JAR that is submitted to spark-submit.
Hello I have a module that called output-jms, which basically
loads data from Kafka (using spark.readStream.format("kafka")
),
transforms it and pushes it to ActiveMQ JMS
when I compile my project, it produces output-jms.jar
file.
This output-jms.jar
file is passed to spark-submit.
output-jms.jar
contains class JmsWriter.scala.
When I run an application, I receive this exception:
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)
... 40 more
Caused by: java.lang.ClassNotFoundException: com.addmeaning.output.jms.spark.service.JmsWriter
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1995)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1862)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2169)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2102)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:493)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:451)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
This is a function that I run to
def writeStreamData(inputDF: DataFrame, serviceDocument: OutputJmsDocument): StreamingQuery = {
val target = serviceDocument.getDetails.getTarget
val userName = target.getUsername
val password = target.getPassword
val brokerUrl = target.getBrokerUrl
val clientId = target.getClientId
val topic = target.getTopic
try {
inputDF
.repartition(1)
.select(col("_value"))
.writeStream
.queryName(target.getQueryName)
.option("checkpointLocation", target.getCheckpointLocation)
.trigger(
serviceDocument.getExecutionType match {
case ExecutionType.STREAMING =>
Trigger.ProcessingTime(target.getTriggerInterval)
case _ => Trigger.Once()
}
)
.foreach(new JmsWriter(userName,
password, brokerUrl, clientId, topic)).start()
} catch {
case e: Throwable =>
logError("Error write stream: " + e)
throw e
}
}
This is a code of JmsWriter:
class JmsWriter(username: String, password: String, brokerUrl: String, clientId: String, topic: String) extends ForeachWriter[Row] with Logging {
val jmsConnectionFactory = JmsConnector.initActiveMqConnectionFactory(username, password, brokerUrl, clientId)
@transient var connection: Connection = _
@transient var jmsSession: Session = _
@transient var jmsProducer: MessageProducer = _
override def open(partitionId: Long, epochId: Long): Boolean = {
connection = {
val cn = jmsConnectionFactory.createConnection()
cn.setClientID(clientId)
cn.start()
logInfo("ActiveMQ connection created")
cn
}
jmsSession = JmsConnector.initActiveMqSession(connection)
jmsProducer = JmsConnector.initActiveMqProducer(jmsSession, topic)
true
}
override def process(value: Row): Unit = jmsProducer.send(jmsSession.createTextMessage(value.mkString("")))
override def close(errorOrNull: Throwable): Unit = {
if (errorOrNull != null) logError("Error during writing the stream: " + errorOrNull)
jmsProducer.close()
jmsSession.close()
connection.close()
}
}
What am I doing wrong? How to get rid of ClassNotFoundException? If you need any addional information please let me know.
Upvotes: 0
Views: 707
Reputation: 1398
Issue resolved. Spark was unable to find those classes because maven configuration of Spring Boot put Spark classes under /BOOT-INF/ folder inside jar. I did not suspect that because Spark code was in separate module.
Using maven shade plugin helped.
Upvotes: 0
Reputation: 3066
First thing you need to check the class is present in the jar or not using
jar -tvf jarname.jar | grep your_class.
Upvotes: 1