Abhishek Allamsetty
Abhishek Allamsetty

Reputation: 11

Why does the kafka consumer code freeze when I start spark stream?

I am new to Kafka and trying to implement Kafka consumer logic in spark2 and when I run all my code in the shell and start the streaming it shows nothing.

I have viewed many posts in StackOverflow but nothing helped me. I have even downloaded all the dependency jars from maven and tried to run but it still shows nothing.

Spark Version: 2.2.0 Scala version 2.11.8 jars I downloaded are kafka-clients-2.2.0.jar and spark-streaming-kafka-0-10_2.11-2.2.0.jar

but it still I face the same issue.

Please find the below code snippet

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.kafka010.{KafkaUtils, ConsumerStrategies, LocationStrategies}

val brokers = "host1:port, host2:port"
val groupid = "default"
val topics = "kafka_sample"
val topicset = topics.split(",").toSet

val ssc = new StreamingContext(sc, Seconds(2))

val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
  ConsumerConfig.GROUP_ID_CONFIG -> groupid,
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)

val msg = KafkaUtils.createDirectStream[String, String](
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicset, kafkaParams)
)

msg.foreachRDD{
rdd => rdd.collect().foreach(println)
}
ssc.start()

I am expecting SparkStreaming to start but it doesn't do anything. What mistake have I done here? Or is this a known issue?

Upvotes: 0

Views: 458

Answers (2)

Aj Tech Developer
Aj Tech Developer

Reputation: 688

  1. After ssc.start() use ssc.awaitTermination() in your code.
  2. For testing, write your code in a Main Object and run it in any IDE like Intellij
  3. You can use command shell and publish messages from the Kafka producer.

I have written all these steps in a simple example in a blog post with working code in GitHub. Please refer to: http://softwaredevelopercentral.blogspot.com/2018/10/spark-streaming-and-kafka-integration.html

Upvotes: 0

Piyush Patel
Piyush Patel

Reputation: 1751

The driver will be sitting idle unless you call ssc.awaitTermination() at the end. If you're using spark-shell then it's not a good tool for streaming jobs. Please, use interactive tools like Zeppelin or Spark notebook for interacting with streaming or try building your app as jar file and then deploy.

Also, if you're trying out spark streaming, Structured Streaming would be better as it is quite easy to play with.

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

Upvotes: 1

Related Questions