codehammer
codehammer

Reputation: 885

Scala error Spark Streaming Kafka : ambiguous reference to overloaded definition

I'm trying to create kafka direct stream with supplying offsets externally within my Spark streaming module, however it results into below compilation error.

Here is the code to create Kafka direct stream

val kafkaParams = Map("metadata.broker.list" -> "kafka.brokers")
// testing only
val fromOffsets: Map[TopicPartition, Long] = Map[TopicPartition, Long]()

val kafkaStream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, Array[Byte]]
    (ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => mmd.message())

Here is the compilation error I'm running into. Any ideas/pointers ?

    ambiguous reference to overloaded definition,
both method createDirectStream in object KafkaUtils of type (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext, keyClass: Class[Array[Byte]], valueClass: Class[Array[Byte]], keyDecoderClass: Class[kafka.serializer.DefaultDecoder], valueDecoderClass: Class[kafka.serializer.DefaultDecoder], recordClass: Class[Array[Byte]], kafkaParams: java.util.Map[String,String], fromOffsets: java.util.Map[kafka.common.TopicAndPartition,Long], messageHandler: org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[Array[Byte],Array[Byte]],Array[Byte]])org.apache.spark.streaming.api.java.JavaInputDStream[Array[Byte]]
and  method createDirectStream in object KafkaUtils of type (ssc: org.apache.spark.streaming.StreamingContext, kafkaParams: Map[String,String], fromOffsets: Map[kafka.common.TopicAndPartition,Long], messageHandler: kafka.message.MessageAndMetadata[Array[Byte],Array[Byte]] => Array[Byte])(implicit evidence$14: scala.reflect.ClassTag[Array[Byte]], implicit evidence$15: scala.reflect.ClassTag[Array[Byte]], implicit evidence$16: scala.reflect.ClassTag[kafka.serializer.DefaultDecoder], implicit evidence$17: scala.reflect.ClassTag[kafka.serializer.DefaultDecoder], implicit evidence$18: scala.reflect.ClassTag[Array[Byte]])org.apache.spark.streaming.dstream.InputDStream[Array[Byte]]
match expected type ?
[ERROR]     val kafkaStream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, Array[Byte]]

Upvotes: 2

Views: 793

Answers (1)

zsxwing
zsxwing

Reputation: 20826

Please use kafka.common.TopicAndPartition instead of org.apache.kafka.common.TopicPartition.

Upvotes: 2

Related Questions