Reputation: 1500
I am trying to write a simple Spark code in Scala.
Here I am getting a DStream. I am successfully able to print this DStream. But when I am trying to do any kind of "foreach" ,"foreachRDD" or "transform" function on this DStream then during execution of my program my console is just getting freeze. Here I am not getting any error but the console is just becoming non-responsive until I manually terminate eclipse console operation. I am attaching the the code here. Kindly tell me what am I doing wrong.
My main objective is to apply RDD operations on DStream and in order to do it as per my knowledge I need to convert my DStream into RDD by using "foreach" ,"foreachRDD" or "transform" function.
I have already achieved same by using Java. But in scala I am having this problem.
Is anybody else facing the same issue? If not then kindly help me out. Thanks
I am Writing a sample code here
object KafkaStreaming {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val ssc = new StreamingContext("local", "KafkaWordCount", Seconds(2))
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val splitLines:DStream[String] = lines.flatMap(_.split("\n"))
val pairAlarm = splitLines.map(
x=>{
//Some Code
val alarmPair = new Tuple2(key, value)
alarmPair
}
)
//pairAlarm.print
pairAlarm.foreachRDD(x=>{
println("1 : "+x.first)
x.collect // When the execution reaching this part its getting freeze
println("2: "+x.first)
})
ssc.start()
ssc.awaitTermination()
}
}
Upvotes: 0
Views: 1494
Reputation: 23881
I don't know if this is your problem, but I had a similar one. My program just stopped printing after several iterations. No exceptions etc. just stops printing after 5-6 prints.
Changing this:
val ssc = new StreamingContext("local", "KafkaWordCount", Seconds(2))
to this:
val ssc = new StreamingContext("local[2]", "KafkaWordCount", Seconds(2))
solved the problem. Spark requires at least 2 threads to run and the documentation examples are misleading as they use just local
as well.
Hope this helps!
Upvotes: 3