Reputation: 427
I am trying to read older messages from Kafka with spark streaming. However, I am only able to retrieve messages as they are sent in real time (i.e., if I populate new messages, while my spark program is running - then I get those messages).
I am changing my groupID and consumerID to make sure zookeeper isn't just not giving messages it knows my program has seen before.
Assuming spark is seeing the offset in zookeeper as -1, shouldn't it read all the old messages in the queue? Am I just misunderstanding the way a kafka queue can be used? I'm very new to spark and kafka, so I can't rule out that I'm just misunderstanding something.
package com.kibblesandbits
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import net.liftweb.json._
object KafkaStreamingTest {
val cfg = new ConfigLoader().load
val zookeeperHost = cfg.zookeeper.host
val zookeeperPort = cfg.zookeeper.port
val zookeeper_kafka_chroot = cfg.zookeeper.kafka_chroot
implicit val formats = DefaultFormats
def parser(json: String): String = {
return json
}
def main(args : Array[String]) {
val zkQuorum = "test-spark02:9092"
val group = "myGroup99"
val topic = Map("testtopic" -> 1)
val sparkContext = new SparkContext("local[3]", "KafkaConsumer1_New")
val ssc = new StreamingContext(sparkContext, Seconds(3))
val json_stream = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
var gp = json_stream.map(_._2).map(parser)
gp.saveAsTextFiles("/tmp/sparkstreaming/mytest", "json")
ssc.start()
}
When running this, I will see the following message. So I am confident that it's not just not seeing the messages because the offset is set.
14/12/05 13:34:08 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1417808045047] Added fetcher for partitions ArrayBuffer([[testtopic,0], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,1], initOffset -1 to broker i d:1,host:test-spark02.vpc,port:9092] , [[testtopic,2], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,3], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,4], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] )
Then, if I populate 1000 new messages -- I can see those 1000 messages saved in my temp directory. But I don't know how to read the existing messages, which should number in the (at this point) tens of thousands.
Upvotes: 6
Views: 4338
Reputation: 37435
Use the alternative factory method on KafkaUtils
that lets you provide a configuration to the Kafka consumer:
def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)]
Then build a map with your kafka configuration and add the parameter 'kafka.auto.offset.reset' set to 'smallest':
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000",
"kafka.auto.offset.reset" -> "smallest"
)
Provide that config to the factory method above. "kafka.auto.offset.reset" -> "smallest" tells the consumer to starts from the smallest offset in your topic.
Upvotes: 8