Alok
Alok

Reputation: 1506

Creating Two DStream from Kafka topic in Spark Streaming not working

In my spark streaming application . I am creating two DStream from two Kafka topic. I am doing so , because i need to process the two DStream differently. Below is the code example:

object KafkaConsumerTest3 {
  var sc:SparkContext = null
  def main(args: Array[String]) {



    Logger.getLogger("org").setLevel(Level.OFF);
    Logger.getLogger("akka").setLevel(Level.OFF);

    val Array(zkQuorum, group, topics1, topics2, numThreads) = Array("localhost:2181", "group3", "test_topic4", "test_topic5","5")
    val sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[2]")
    sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))


    val topicMap1 = topics1.split(",").map((_, numThreads.toInt)).toMap
    val topicMap2 = topics2.split(",").map((_, numThreads.toInt)).toMap

    val lines2 = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap2).map(_._2)
    val lines1 = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap1).map(_._2)

    lines2.foreachRDD{rdd =>
      rdd.foreach { println }}

    lines1.foreachRDD{rdd =>
      rdd.foreach { println }}

    ssc.start()
    ssc.awaitTermination()
  }
}

Both the topics may or may not have data . In my case first topic is not getting data currently but the second topic is getting. But my spark application is not printing any data. And there is no exception as well. Is there anything i am missing? or how do i resolve this issue.

Upvotes: 0

Views: 480

Answers (1)

Alok
Alok

Reputation: 1506

Found out the issue with above code. The problem is that we have used master as local[2] and we are registering two receiver.Increasing the number of thread solve the problem.

Upvotes: 0

Related Questions