Reputation: 885
I'm trying to create kafka direct stream with supplying offsets externally within my Spark streaming module, however it results into below compilation error.
Here is the code to create Kafka direct stream
val kafkaParams = Map("metadata.broker.list" -> "kafka.brokers")
// testing only
val fromOffsets: Map[TopicPartition, Long] = Map[TopicPartition, Long]()
val kafkaStream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, Array[Byte]]
(ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => mmd.message())
Here is the compilation error I'm running into. Any ideas/pointers ?
ambiguous reference to overloaded definition,
both method createDirectStream in object KafkaUtils of type (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext, keyClass: Class[Array[Byte]], valueClass: Class[Array[Byte]], keyDecoderClass: Class[kafka.serializer.DefaultDecoder], valueDecoderClass: Class[kafka.serializer.DefaultDecoder], recordClass: Class[Array[Byte]], kafkaParams: java.util.Map[String,String], fromOffsets: java.util.Map[kafka.common.TopicAndPartition,Long], messageHandler: org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[Array[Byte],Array[Byte]],Array[Byte]])org.apache.spark.streaming.api.java.JavaInputDStream[Array[Byte]]
and method createDirectStream in object KafkaUtils of type (ssc: org.apache.spark.streaming.StreamingContext, kafkaParams: Map[String,String], fromOffsets: Map[kafka.common.TopicAndPartition,Long], messageHandler: kafka.message.MessageAndMetadata[Array[Byte],Array[Byte]] => Array[Byte])(implicit evidence$14: scala.reflect.ClassTag[Array[Byte]], implicit evidence$15: scala.reflect.ClassTag[Array[Byte]], implicit evidence$16: scala.reflect.ClassTag[kafka.serializer.DefaultDecoder], implicit evidence$17: scala.reflect.ClassTag[kafka.serializer.DefaultDecoder], implicit evidence$18: scala.reflect.ClassTag[Array[Byte]])org.apache.spark.streaming.dstream.InputDStream[Array[Byte]]
match expected type ?
[ERROR] val kafkaStream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, Array[Byte]]
Upvotes: 2
Views: 793
Reputation: 20826
Please use kafka.common.TopicAndPartition
instead of org.apache.kafka.common.TopicPartition
.
Upvotes: 2