Amitabh Ranjan
Amitabh Ranjan

Reputation: 1500

How to apply RDD function on DStream while writing code in scala

I am trying to write a simple Spark code in Scala.

Here I am getting a DStream. I am successfully able to print this DStream. But when I am trying to do any kind of "foreach" ,"foreachRDD" or "transform" function on this DStream then during execution of my program my console is just getting freeze. Here I am not getting any error but the console is just becoming non-responsive until I manually terminate eclipse console operation. I am attaching the the code here. Kindly tell me what am I doing wrong.

My main objective is to apply RDD operations on DStream and in order to do it as per my knowledge I need to convert my DStream into RDD by using "foreach" ,"foreachRDD" or "transform" function.

I have already achieved same by using Java. But in scala I am having this problem.

Is anybody else facing the same issue? If not then kindly help me out. Thanks

I am Writing a sample code here

object KafkaStreaming {
    def main(args: Array[String]) {
        if (args.length < 4) {
            System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
            System.exit(1)
        }

        val Array(zkQuorum, group, topics, numThreads) = args
                val ssc =  new StreamingContext("local", "KafkaWordCount", Seconds(2))
        val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
        val splitLines:DStream[String] = lines.flatMap(_.split("\n"))

        val pairAlarm = splitLines.map(

                x=>{
                            //Some Code
                            val alarmPair = new Tuple2(key, value)
                            alarmPair
                }

                )


            //pairAlarm.print


            pairAlarm.foreachRDD(x=>{
            println("1 : "+x.first)
            x.collect                           // When the execution reaching this part its getting freeze
            println("2: "+x.first)
            })


                ssc.start()
                ssc.awaitTermination()
    }
}

Upvotes: 0

Views: 1494

Answers (1)

serejja
serejja

Reputation: 23881

I don't know if this is your problem, but I had a similar one. My program just stopped printing after several iterations. No exceptions etc. just stops printing after 5-6 prints.

Changing this:

val ssc =  new StreamingContext("local", "KafkaWordCount", Seconds(2))

to this:

val ssc =  new StreamingContext("local[2]", "KafkaWordCount", Seconds(2))

solved the problem. Spark requires at least 2 threads to run and the documentation examples are misleading as they use just local as well.

Hope this helps!

Upvotes: 3

Related Questions