Reputation: 620
I'm pretty new to spark and I'm trying to receive a DStream structured as a json from a kafka topic and I want to parse the content of each json. The json I receive is something like this:
{"type":"position","ident":"IBE32JZ","air_ground":"A","alt":"34000","clock":"1409733420","id":"IBE32JZ-1409715361-ed-0002:0","gs":"446","heading":"71","lat":"44.50987","lon":"2.98972","reg":"ECJRE","squawk":"1004","updateType":"A","altChange":" "}
I'm trying to extract the ident field only, at least for now and I'm using lift-json library to parse the data. My program looks like this:
object ScalaExample {
val kafkaHost = "localhost"
val kafkaPort = 9092
val zookeeperHost = "localhost"
val zookeeperPort = 2181
implicit val formats = DefaultFormats
case class PlaneInfo(ident: String)
def parser(json: String): String = {
val parsedJson = parse(json)
val m = paso1.extract[PlaneInfo]
return m.ident
def main(args : Array[String]) {
val zkQuorum = "localhost:2181"
val group = "myGroup"
val topic = Map("flightStatus" -> 1)
val sparkContext = new SparkContext("local[4]", "KafkaConsumer")
val ssc = new StreamingContext(sparkContext, Seconds(10))
val json = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
val id =
but it throws me the exception below:
java.lang.NoClassDefFoundError: scala/reflect/ClassManifest
at net.liftweb.json.JsonAST$JValue.extract(JsonAST.scala:300)
at aero.catec.stratio.ScalaExample$.parser(ScalaExample.scala:33)
at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48)
at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48)
at scala.collection.Iterator$$anon$
at scala.collection.Iterator$$anon$
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:575)
at org.apache.spark.scheduler.DAGScheduler$$anon$
Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest
at Method)
at java.lang.ClassLoader.loadClass(
at java.lang.ClassLoader.loadClass(
The thing is that if a run the same without using spark (reading from a file) it works perfectly. The problem starts when I try to put it in a spark program. Also, if I change the parser function to something like this:
def parser(json: String): JValue = {
val parsedJson = parse(json)
return (parsedJson \\ "ident")
It also works. But when I try to extract the actual String, I get the same error.
Thank you for your help. I hope I had explained it well.
Upvotes: 10
Views: 10276
Reputation: 10606
Oh, a good old issue :-)
Basically this indicates a version problem: one of your dependencies is not compatible with the Scala compiler you are currently using. Are you on 2.10?
Try Googling the phrase "NoClassDefFoundError: scala/reflect/ClassManifest", I'm sure you will find more than enough description about the issue.
Upvotes: 0
Reputation: 84
this happens because you are missing a scala reflect dependence needed to serialize/deserialize the record. Try to add the scala reflect jar that match the spark version.
Tip: "org.scala-lang" % "scala-reflect" % Version.scala
Upvotes: 2