Reputation: 29477
I am brand new to Spark & Kafka and am trying to get some Scala code (running as a Spark job) to act as a long-running process (not just a short-lived/scheduled task) and to continuously poll a Kafka broker for messages. When it receives messages, I just want them printed out to the console/STDOUT. Again, this needs to be a long-running process and basically (try to) live forever.
After doing some digging, it seems like a StreamingContext
is what I want to use. Here's my best attempt:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.storage._
import org.apache.spark.streaming.{StreamingContext, Seconds, Minutes, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
def createKafkaStream(ssc: StreamingContext, kafkaTopics: String, brokers: String): DStream[(String, String)] = {
val topicsSet = kafkaTopics.split(",").toSet
val props = Map(
"bootstrap.servers" -> "my-kafka.example.com:9092",
"metadata.broker.list" -> "my-kafka.example.com:9092",
"serializer.class" -> "kafka.serializer.StringEncoder",
"value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, props, topicsSet)
}
def processEngine(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(1))
val topicStream = createKafkaStream(ssc, "mytopic", "my-kafka.example.com:9092").print()
ssc
}
StreamingContext.getActive.foreach {
_.stop(stopSparkContext = false)
}
val ssc1 = StreamingContext.getActiveOrCreate(processEngine)
ssc1.start()
ssc1.awaitTermination()
When I run this, I get no exceptions/errors, but nothing seems to happen. I can confirm there are messages on the topic. Any ideas as to where I'm going awry?
Upvotes: 2
Views: 1941
Reputation: 61
Is this your complete code? where did you create sc? you have to create spark context before streaming context. you can create sc like this :
SparkConf sc = new SparkConf().setAppName("SparkConsumer");
Also, without awaitTermination
, it is very hard to catch and print exceptions that occur during the background data processing. Can you add ssc1.awaitTermination();
at the end and see if you get any error.
Upvotes: 1
Reputation: 149518
When you foreachRDD
, the output is printed in the Worker nodes, not the Master. I'm assuming you're looking at the Master's console output. You can use DStream.print
instead:
val ssc = new StreamingContext(sc, Seconds(1))
val topicStream = createKafkaStream(ssc, "mytopic", "my-kafka.example.com:9092").print()
Also, don't forget to call ssc.awaitTermination()
after ssc.start()
:
ssc.start()
ssc.awaitTermination()
As a sidenote, I'm assuming you copy pasted this example, but there's no need to use transform
on the DStream
if you're not actually planning to do anything with the OffsetRange
.
Upvotes: 3