rsmin
rsmin

Reputation: 21

Kafka createDirectStream in Spark Streaming

I'm trying the example code from Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher). The code can run without any error but I cannot receive any record. If I run kafka-console-consumer.sh --from-beginning, I can get records. Does anyone know the reason? My code is as below:

val broker = "221.181.73.44:19092"
val topics = Array("connect-test")
val groupid = "SparkStreamingLoad3"
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> broker,
  "group.id" -> groupid,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "auto.offset.reset" -> "earliest", //earliest | latest
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

stream.print()

ssc.start()
ssc.awaitTermination()

My SBT build is:

version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming-kafka-0-10_2.10" % "2.1.0",
  "org.apache.spark" % "spark-core_2.10" % "2.1.0",
"org.apache.spark" % "spark-streaming_2.10" % "2.1.0",
"org.apache.kafka" % "kafka_2.10" % "0.10.2.1"
)

Thanks!

Upvotes: 1

Views: 2167

Answers (2)

rsmin
rsmin

Reputation: 21

Finally, I got the issue solved. Here is the answer:

  1. The data in the topic is generated from console producer which is a list of Strings. However, the data is in the format of [Array[Byte], Array[Byte]]. Not [String,String]. So if I use StringDeserializer, there will be no data received.

  2. I learned from console-consumer source code writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit

The key/value in RDDs could contain null values. In my case, all keys are null. I use the following code to get data:

stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte]](ssc, PreferConsistent, Subscribe[Array[Byte], Array[Byte]](topics, kafkaParams)) stream.map(rdd=>new String(Option(rdd.key()).getOrElse("null".getBytes))+ "|||delemiter|||"+new String(Option(rdd.value()).getOrElse("null".getBytes))).print()

Upvotes: 1

Paul Leclercq
Paul Leclercq

Reputation: 1018

val broker = "221.181.73.44:19092"

The default port is 9092, it might be the problem.

"auto.offset.reset" -> "earliest" and "enable.auto.commit" -> false should always make your read from the beginning of your topic's logs, as your offsets are not stored anywhere. So there is no problems with that.

Also, can we see the full command you use for kafka-console-consumer.sh ?

Upvotes: 0

Related Questions