Reputation: 853
I have spark-streaming code which works in client mode: it reads data from kafka, does some processing, and use spark-cassandra-connector to insert data to cassandra.
When I use the "--deploy-mode cluster", data does not get inserted, and I get the following error:
Exception in thread "streaming-job-executor-53" java.lang.NoClassDefFoundError: com/datastax/spark/connector/ColumnSelector at com.enerbyte.spark.jobs.wattiopipeline.WattiopipelineStreamingJob$$anonfun$main$2.apply(WattiopipelineStreamingJob.scala:94) at com.enerbyte.spark.jobs.wattiopipeline.WattiopipelineStreamingJob$$anonfun$main$2.apply(WattiopipelineStreamingJob.scala:88) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.datastax.spark.connector.ColumnSelector at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
I added dependancy for connector like this:
"com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0" % "provided"
This is my application code:
val measurements = KafkaUtils.createDirectStream[
Array[Byte],
Array[Byte],
DefaultDecoder,
DefaultDecoder](ssc, kafkaConfig, Set("wattio"
))
.map {
case (k, v) => {
val decoder = new AvroDecoder[WattioMeasure](null,
WattioMeasure.SCHEMA$)
decoder.fromBytes(v)
}
}
//inserting into WattioRaw
WattioFunctions.run(WattioFunctions.
processWattioRaw(measurements))(
(rdd: RDD[
WattioTenantRaw], t: Time) => {
rdd.cache()
//get all the different tenants
val differentTenants = rdd.map(a
=> a.tenant).distinct().collect()
// for each tenant, create keyspace value and flush to cassandra
differentTenants.foreach(tenant => {
val keyspace = tenant + "_readings"
rdd.filter(a => a.tenant == tenant).map(s => s.wattioRaw).saveToCassandra(keyspace, "wattio_raw")
})
rdd.unpersist(true)
}
)
ssc.checkpoint("/tmp")
ssc.start()
ssc.awaitTermination()
Upvotes: 1
Views: 558
Reputation: 39501
scope provided means, you expect the JDK or a container to provide the dependency at runtime and that particular dependency jar will not be part of your final application War/jar you are creating hence that error.
Upvotes: 0
Reputation: 853
Actually I solved it by removing "provided" in the dependancy list, so that sbt packaged spark-cassandra-connector to my assembly jar.
Interesting thing is that in my launch script, even when I tryed to use
spark-submit --repositories "location of my artifactory repository" --packages "spark-cassandra-connector"
or
spark-submit --jars "spark-cassandra-connector.jar"
both of them failed!
Upvotes: 0
Reputation: 149538
You need to make sure your JAR is available to the workers. The spark master will open up a file server once the execution of the job starts.
You need to specify the path to your uber jar either by using SparkContext.setJars
, or via the --jars
flag passed to spark-submit
.
When using spark-submit, the application jar along with any jars included with the --jars option will be automatically transferred to the cluster. Spark uses the following URL scheme to allow different strategies for disseminating jars
Upvotes: 1