user1523567
user1523567

Reputation: 59

Initialize createDirectStream with MessageHandler

Below is the existing code that i am using to read create Dstream[String] from Kafka Spark streaming :

var messages: DStream[String] = null
val topicsSet = kafkaTopicsName.split(",").toSet
messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet).map(_._2)
message.foreachRDD { rdd =>
. . . . . 
}
streamingContext.start()
streamingContext.awaitTermination()

I wanted to switch from:

def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](ssc: StreamingContext, 
    kafkaParams: Map[String, String], topics: Set[String])(implicit arg0: ClassTag[K], 
    arg1: ClassTag[V], arg2: ClassTag[KD], arg3: ClassTag[VD]): InputDStream[(K, V)]

to

def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]
    (jssc: JavaStreamingContext, keyClass: Class[K], valueClass: Class[V],   
     keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], recordClass: Class[R], 
     kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], 
    messageHandler: Function[MessageAndMetadata[K, V], R]): JavaInputDStream[R]

So that i can manage the offset of spark consumer on my own. I have figured out how to access offsetranges from Zookeeper and then assigning offset ranges to createDirectStream

But i am not sure about how to pass messagehandler and class R. What exactly these two parameter does? Really appritiate if i can get a sample code snippet in scala for the above constructor.

Upvotes: 1

Views: 1321

Answers (2)

user1523567
user1523567

Reputation: 59

I figured out one solution:

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
  streamingContext, kafkaParams, topicMap, (mmd: MessageAndMetadata[String, String]) => mmd.message())

Upvotes: 1

ankur jha
ankur jha

Reputation: 11

You can use something like this .

KafkaUtils.createDirectStream(sparkStreamContext, String.class, String.class, StringDecoder.class, StringDecoder.class,String.class, kafkaParams,fromOffsets,(Function, String>) MessageAndMetadata::message);

This will be returning a JavaInputDStream consisting of your messages based on the fromOffsets

Upvotes: 0

Related Questions