Reputation: 287
I am trying to read in the spark streaming offset into my consumer but I cannot seem to do it correctly.
Here is my code.
val dfoffset = hiveContext.sql(s"select * from $db")
dfoffset.show()
val dfoffsetArray = dfoffset.collect()
println("printing array of data")
dfoffsetArray.foreach(println)
val fromOffsets = collection.mutable.Map[TopicAndPartition, Long]()
for (i <- dfoffsetArray) {
val topicAndPartition = (TopicAndPartition(i(1).toString, i(0).toString.toInt) -> (i(2).toString.toLong))
fromOffsets += topicAndPartition
}
val kafkaParams = Map[String, String]("bootstrap.servers" -> serverName, "group.id" -> "test")
val topics = Array(topicName).toSet
//stuck here
var directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
directKafkaStream.foreachRDD(rdd1 => { ..
Here is the output from showing the dataframe
partition_number|topic_name|current_offset|
+----------------+----------+--------------+
| 0|TOPIC_NAME| 4421|
Any help is greatly appreciated.
I am using spark 1.6 , Scala 2.10.5, kafka 10
Upvotes: 0
Views: 1060
Reputation: 279
As the official document shown KafkaUtils.createDirectStream, you should pass the fromOffsets
as the 3rd parameter of createDirectStream(and don't forget about the 4th parameter messageHandler
).
The fromOffsets
parameter suppose to be a collection.immutable.Map[TopicAndPartition, Long]
, we usually use immutable instead of mutable as possible in Scala.
You may transform the dfoffsetArray
to immutable.Map[TopicAndPartition, Long]
with the following:
val fromOffsets = dfoffsetArray.map( i =>
TopicAndPartition(i(1).toString, i(0).toString.toInt) -> (i(2).toString.toLong)
).toMap
And the messageHandler
is type of (MessageAndMetadata[K, V]) ⇒ R)
, which deals key and value of messages. You can define a simple handler as the following:
val messageHandler =
(mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
Then your createDirectStream
will look like...
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,
(String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
Now you are free to do some transformation to your stream. Happy streaming!
I was tutored by this article months ago. Maybe you will find it helpful.
Upvotes: 1