Reputation: 546
I am trying to consume message from kafka producer through spark streaming program .
Here is my program
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
// lines.print()
lines.foreachRDD(rdd=>{
rdd.foreach(message=>
println(message))
})
The above program is running successfully. But I could not see any message get printed.
Upvotes: 1
Views: 653
Reputation: 15327
Set your master url using "local[*]"
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")
You can also try to call collect() and see if you get messages.
lines.foreachRDD { rdd =>
rdd.collect().foreach(println)
}
Upvotes: 1