Adam
Adam

Reputation: 442

scala spark NoClassDefFoundError - InitialPositionInStream

Deploying a spark application written in scala to an EMR cluster with the following command and i cannot figure out why I am receiving a missing dependency error message when deployed to the EMR cluster instance.

error message:

User class threw exception: java.lang.NoClassDefFoundError: com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream
aws emr add-steps --cluster-id j-xxxxxxx --steps Type=spark,Name=ScalaStream,Args=[\
--class,"ScalaStream",\
--deploy-mode,cluster,\
--master,yarn,\
--jars,s3://xxx.xxx.xxx/aws-java-sdk-1.11.715.jar,\
--conf,spark.yarn.submit.waitAppCompletion=false,\
s3://xxx.xxxx.xxxx/simple-project_2.12-1.0.jar\
],ActionOnFailure=CONTINUE

and sbt file

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.8"

libraryDependencies += "org.apache.spark" % "spark-sql_2.12" % "2.4.4"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "2.4.4"
libraryDependencies += "com.amazonaws" % "aws-java-sdk" % "1.11.715"
libraryDependencies += "org.apache.spark" % "spark-streaming-kinesis-asl_2.12" % "2.4.4"

partial code below

...

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

...

        val streamingContext = new StreamingContext(sparkContext, batchInterval)

        // Populate the appropriate variables from the given args
        val streamAppName = "xxxxxx"
        val streamName = "xxxxxx"
        val endpointUrl = "https://kinesis.xxxxx.amazonaws.com"
        val regionName = "xx-xx-x"
        val initialPosition = InitialPositionInStream.LATEST
        val checkpointInterval = batchInterval
        val storageLevel = StorageLevel.MEMORY_AND_DISK_2

        val kinesisStream = KinesisUtils.createStream(streamingContext, streamAppName, streamAppName, endpointUrl, regionName, initialPosition, checkpointInterval, storageLevel)

        val initialPosition = InitialPositionInStream.LATEST
        val checkpointInterval = batchInterval
        val storageLevel = StorageLevel.MEMORY_AND_DISK_2

        val kinesisStream = KinesisUtils.createStream(streamingContext, streamAppName, streamAppName, endpointUrl, regionName, initialPosition, checkpointInterval, storageLevel)

20/02/05 21:43:10 ERROR ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream
java.lang.NoClassDefFoundError: com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream
    at ScalaStream$.main(stream.scala:32)
    at ScalaStream.main(stream.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more
20/02/05 21:43:10 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.lang.NoClassDefFoundError: com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream
    at ScalaStream$.main(stream.scala:32)
    at ScalaStream.main(stream.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more
)

I've tried including the aws dependencies both in the sbt file and also --jars paramater of spark-submit but cannot see why the dependency is missing?

Upvotes: 0

Views: 410

Answers (1)

Adam
Adam

Reputation: 442

fixed by updating the following

sbt

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.8"

libraryDependencies += "org.apache.spark" % "spark-sql_2.12" % "2.4.4"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "2.4.4"
libraryDependencies += "org.apache.spark" % "spark-streaming-kinesis-asl_2.12" % "2.4.4"

deploy script

aws emr add-steps --cluster-id j-xxxxxxx --steps Type=spark,Name=ScalaStream,Args=[\
--class,"ScalaStream",\
--deploy-mode,cluster,\
--master,yarn,\
--packages,\'org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.0,org.postgresql:postgresql:42.2.9,com.facebook.presto:presto-jdbc:0.60\',\
--conf,spark.yarn.submit.waitAppCompletion=false,\
--conf,yarn.log-aggregation-enable=true,\
--conf,spark.dynamicAllocation.enabled=true,\
--conf,spark.cores.max=4,\
--conf,spark.network.timeout=300,\
s3://xxx.xxx/simple-project_2.12-1.0.jar\
],ActionOnFailure=CONTINUE

the key being the --packages flag added to the aws emr add-steps. Mistakenly thought sbt package bundled the required dependencies.

Upvotes: 3

Related Questions