Reputation: 21
I'm trying the example code from Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher). The code can run without any error but I cannot receive any record. If I run kafka-console-consumer.sh --from-beginning, I can get records. Does anyone know the reason? My code is as below:
val broker = "221.181.73.44:19092"
val topics = Array("connect-test")
val groupid = "SparkStreamingLoad3"
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> broker,
"group.id" -> groupid,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "earliest", //earliest | latest
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
stream.print()
ssc.start()
ssc.awaitTermination()
My SBT build is:
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-streaming-kafka-0-10_2.10" % "2.1.0",
"org.apache.spark" % "spark-core_2.10" % "2.1.0",
"org.apache.spark" % "spark-streaming_2.10" % "2.1.0",
"org.apache.kafka" % "kafka_2.10" % "0.10.2.1"
)
Thanks!
Upvotes: 1
Views: 2167
Reputation: 21
Finally, I got the issue solved. Here is the answer:
The data in the topic is generated from console producer which is a list of Strings. However, the data is in the format of [Array[Byte], Array[Byte]]. Not [String,String]. So if I use StringDeserializer, there will be no data received.
I learned from console-consumer source code writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit
The key/value in RDDs could contain null values. In my case, all keys are null. I use the following code to get data:
stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte]](ssc, PreferConsistent, Subscribe[Array[Byte], Array[Byte]](topics, kafkaParams)) stream.map(rdd=>new String(Option(rdd.key()).getOrElse("null".getBytes))+ "|||delemiter|||"+new String(Option(rdd.value()).getOrElse("null".getBytes))).print()
Upvotes: 1
Reputation: 1018
val broker = "221.181.73.44:19092"
The default port is 9092, it might be the problem.
"auto.offset.reset" -> "earliest"
and "enable.auto.commit" -> false
should always make your read from the beginning of your topic's logs, as your offsets are not stored anywhere. So there is no problems with that.
Also, can we see the full command you use for kafka-console-consumer.sh
?
Upvotes: 0