Reputation: 11
I am new to Kafka and trying to implement Kafka consumer logic in spark2 and when I run all my code in the shell and start the streaming it shows nothing.
I have viewed many posts in StackOverflow but nothing helped me. I have even downloaded all the dependency jars from maven and tried to run but it still shows nothing.
Spark Version: 2.2.0 Scala version 2.11.8 jars I downloaded are kafka-clients-2.2.0.jar and spark-streaming-kafka-0-10_2.11-2.2.0.jar
but it still I face the same issue.
Please find the below code snippet
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.kafka010.{KafkaUtils, ConsumerStrategies, LocationStrategies}
val brokers = "host1:port, host2:port"
val groupid = "default"
val topics = "kafka_sample"
val topicset = topics.split(",").toSet
val ssc = new StreamingContext(sc, Seconds(2))
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupid,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
val msg = KafkaUtils.createDirectStream[String, String](
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicset, kafkaParams)
)
msg.foreachRDD{
rdd => rdd.collect().foreach(println)
}
ssc.start()
I am expecting SparkStreaming to start but it doesn't do anything. What mistake have I done here? Or is this a known issue?
Upvotes: 0
Views: 458
Reputation: 688
ssc.start()
use ssc.awaitTermination()
in your code.I have written all these steps in a simple example in a blog post with working code in GitHub. Please refer to: http://softwaredevelopercentral.blogspot.com/2018/10/spark-streaming-and-kafka-integration.html
Upvotes: 0
Reputation: 1751
The driver will be sitting idle unless you call ssc.awaitTermination()
at the end. If you're using spark-shell then it's not a good tool for streaming jobs.
Please, use interactive tools like Zeppelin or Spark notebook for interacting with streaming or try building your app as jar file and then deploy.
Also, if you're trying out spark streaming, Structured Streaming would be better as it is quite easy to play with.
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
Upvotes: 1