javadev
javadev

Reputation: 287

Spark 1.6 Streaming consumer reading in kafka offset stuck at createDirectStream

I am trying to read in the spark streaming offset into my consumer but I cannot seem to do it correctly.

Here is my code.

val dfoffset = hiveContext.sql(s"select * from $db")
dfoffset.show()
val dfoffsetArray = dfoffset.collect()
println("printing array of data")
dfoffsetArray.foreach(println)
val fromOffsets = collection.mutable.Map[TopicAndPartition, Long]()
for (i <- dfoffsetArray) {
  val topicAndPartition = (TopicAndPartition(i(1).toString, i(0).toString.toInt) -> (i(2).toString.toLong))
  fromOffsets += topicAndPartition
}

val kafkaParams = Map[String, String]("bootstrap.servers" -> serverName, "group.id" -> "test")
val topics = Array(topicName).toSet
//stuck here 
var directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

directKafkaStream.foreachRDD(rdd1 => { ..

Here is the output from showing the dataframe

partition_number|topic_name|current_offset|
+----------------+----------+--------------+
|               0|TOPIC_NAME|          4421|

Any help is greatly appreciated.

I am using spark 1.6 , Scala 2.10.5, kafka 10

Upvotes: 0

Views: 1060

Answers (1)

Ire
Ire

Reputation: 279

As the official document shown KafkaUtils.createDirectStream, you should pass the fromOffsets as the 3rd parameter of createDirectStream(and don't forget about the 4th parameter messageHandler).

The fromOffsets parameter suppose to be a collection.immutable.Map[TopicAndPartition, Long], we usually use immutable instead of mutable as possible in Scala.
You may transform the dfoffsetArray to immutable.Map[TopicAndPartition, Long] with the following:

val fromOffsets = dfoffsetArray.map( i =>
  TopicAndPartition(i(1).toString, i(0).toString.toInt) -> (i(2).toString.toLong)
).toMap

And the messageHandler is type of (MessageAndMetadata[K, V]) ⇒ R), which deals key and value of messages. You can define a simple handler as the following:

val messageHandler =
  (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)

Then your createDirectStream will look like...

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,
  (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)

Now you are free to do some transformation to your stream. Happy streaming!


I was tutored by this article months ago. Maybe you will find it helpful.

Upvotes: 1

Related Questions