addmeaning
addmeaning

Reputation: 1398

Spark throws ClassNotFoundException, but the class is inside a jar I submitting to Spark

Spark structured streaming throws ClassNotFoundException but class is in JAR that is submitted to spark-submit.

Hello I have a module that called output-jms, which basically loads data from Kafka (using spark.readStream.format("kafka")), transforms it and pushes it to ActiveMQ JMS

when I compile my project, it produces output-jms.jar file.

This output-jms.jar file is passed to spark-submit.

output-jms.jar contains class JmsWriter.scala.

When I run an application, I receive this exception:

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)
    ... 40 more

Caused by: java.lang.ClassNotFoundException: com.addmeaning.output.jms.spark.service.JmsWriter
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Class.java:398)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
    at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1995)
    at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1862)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2169)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
    at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2102)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:493)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:451)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

This is a function that I run to

  def writeStreamData(inputDF: DataFrame, serviceDocument: OutputJmsDocument): StreamingQuery = {
    val target = serviceDocument.getDetails.getTarget
    val userName = target.getUsername
    val password = target.getPassword
    val brokerUrl = target.getBrokerUrl
    val clientId = target.getClientId
    val topic = target.getTopic
    try {

      inputDF
        .repartition(1)
        .select(col("_value"))
        .writeStream
        .queryName(target.getQueryName)
        .option("checkpointLocation", target.getCheckpointLocation)
        .trigger(
          serviceDocument.getExecutionType match {
            case ExecutionType.STREAMING =>
              Trigger.ProcessingTime(target.getTriggerInterval)
            case _ => Trigger.Once()
          }
        )
        .foreach(new JmsWriter(userName,
                  password, brokerUrl, clientId, topic)).start()
    } catch {
      case e: Throwable =>
        logError("Error write stream: " + e)
        throw e
    }
  }

This is a code of JmsWriter:

class JmsWriter(username: String, password: String, brokerUrl: String, clientId: String, topic: String) extends ForeachWriter[Row] with Logging {

  val jmsConnectionFactory = JmsConnector.initActiveMqConnectionFactory(username, password, brokerUrl, clientId)
  @transient var connection: Connection = _
  @transient var jmsSession: Session = _
  @transient var jmsProducer: MessageProducer = _

  override def open(partitionId: Long, epochId: Long): Boolean = {
    connection = {
      val cn = jmsConnectionFactory.createConnection()
      cn.setClientID(clientId)
      cn.start()
      logInfo("ActiveMQ connection created")
      cn
    }
    jmsSession = JmsConnector.initActiveMqSession(connection)
    jmsProducer = JmsConnector.initActiveMqProducer(jmsSession, topic)
    true
  }

  override def process(value: Row): Unit = jmsProducer.send(jmsSession.createTextMessage(value.mkString("")))

  override def close(errorOrNull: Throwable): Unit = {
    if (errorOrNull != null) logError("Error during writing the stream: " + errorOrNull)
    jmsProducer.close()
    jmsSession.close()
    connection.close()
  }
}

What am I doing wrong? How to get rid of ClassNotFoundException? If you need any addional information please let me know.

Upvotes: 0

Views: 707

Answers (2)

addmeaning
addmeaning

Reputation: 1398

Issue resolved. Spark was unable to find those classes because maven configuration of Spring Boot put Spark classes under /BOOT-INF/ folder inside jar. I did not suspect that because Spark code was in separate module.

Using maven shade plugin helped.

Upvotes: 0

Ranga Reddy
Ranga Reddy

Reputation: 3066

First thing you need to check the class is present in the jar or not using

jar -tvf jarname.jar | grep your_class.

Upvotes: 1

Related Questions