covfefe
covfefe

Reputation: 2665

AbstractMethodError creating Kafka stream

I'm trying to open a Kafka (tried versions 0.11.0.2 and 1.0.1) stream using createDirectStream method and getting this AbstractMethodError error:

Exception in thread "main" java.lang.AbstractMethodError
    at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.initializeLogIfNecessary(KafkaUtils.scala:39)
    at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.log(KafkaUtils.scala:39)
    at org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.logWarning(KafkaUtils.scala:39)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.fixKafkaParams(KafkaUtils.scala:201)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:63)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)

This is how I'm calling it:

val preferredHosts = LocationStrategies.PreferConsistent
    val kafkaParams = Map(
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[IntegerDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest"
    )

    val aCreatedStream = createDirectStream[String, String](ssc, preferredHosts,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

I have Kafka running on 9092 and I'm able to create producers and consumers and pass messages between them so not sure why it's not working from Scala code. Any ideas appreciated.

Upvotes: 18

Views: 12463

Answers (3)

la_femme_it
la_femme_it

Reputation: 672

I recieved same error. I set my dependencies same version as my spark interpreter is

%spark2.dep
z.reset()
z.addRepo("MavenCentral").url("https://mvnrepository.com/")

z.load("org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.0")
z.load("org.apache.kafka:kafka-clients:2.3.0")

Upvotes: 1

Oliv
Oliv

Reputation: 10812

I had the same exception, in my case I created the application jar with dependency to spark-streaming-kafka-0-10_2.11 of version 2.1.0, while trying to deploy to Spark 2.3.0 cluster.

Upvotes: 6

covfefe
covfefe

Reputation: 2665

Turns out I was using Spark 2.3 and I should've been using Spark 2.2. Apparently that method was made abstract in the later version so I was getting that error.

Upvotes: 21

Related Questions