senthil kumar p
senthil kumar p

Reputation: 546

unable to consume kafka message through spark stream

I am trying to consume message from kafka producer through spark streaming program .

Here is my program

  val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local")
      val ssc = new StreamingContext(sparkConf, Seconds(5))

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
     // lines.print()
lines.foreachRDD(rdd=>{
            rdd.foreach(message=>
      println(message))
    })

The above program is running successfully. But I could not see any message get printed.

Upvotes: 1

Views: 653

Answers (1)

abaghel
abaghel

Reputation: 15327

Set your master url using "local[*]"

val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")

You can also try to call collect() and see if you get messages.

lines.foreachRDD { rdd =>
      rdd.collect().foreach(println)
}

Upvotes: 1

Related Questions