Konstantin Popov
Konstantin Popov

Reputation: 1746

Missing required configuration "bootstrap.servers" error in Spark Streaming standard example

I am somewhat new to Scala and Spark, so feel free judging me, but not too hard.

I am trying to launch the standard DirectKafkaWordCount example (provided with the Spark2 installation) to test how Spark Streaming works with Kafka.

This is the code of the example (can also be found here):

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: DirectKafkaWordCount <brokers> <topics>
 *   <brokers> is a list of one or more Kafka brokers
 *   <topics> is a list of one or more kafka topics to consume from
 *
 * Example:
 *    $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \
 *    topic1,topic2
 */
object DirectKafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(s"""
        |Usage: DirectKafkaWordCount <brokers> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_.value)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println

While trying to launch it, I had to put spark-streaming-kafka-0-10_2.11-2.3.1.jar and kafka-clients-0.10.0.1.jar to /usr/hdp/3.0.0.0-1634/spark2/jars/ directory (which surprised me somewhat, since I assumed that all standard examples provided with the installation had to be working out of the box, but the WordCount example claimed for those packages). After adding those jars I tried to read records from the topic test and do word count via the command

/usr/hdp/3.0.0.0-1634/spark2/bin/run-example streaming.DirectKafkaWordCount localhost:9092 test

The application fails, however, and the error I get looks like this:

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value.
        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:421)
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:55)
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
        at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:376)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
        at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:84)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:70)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:240)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
        at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
        at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
        at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
        at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
        at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
        at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
        at org.apache.spark.examples.streaming.DirectKafkaWordCount$.main(DirectKafkaWordCount.scala:70)
        at org.apache.spark.examples.streaming.DirectKafkaWordCount.main(DirectKafkaWordCount.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

This confuses me, since I have provided the bootstrap server (localhost:9092) in the launching command. Any ideas where to dig from here?

My configuration:

Spark - 2.3.1

Kafka - 2.11-1.0.1

Upvotes: 0

Views: 6146

Answers (3)

Atul
Atul

Reputation: 3357

In case, if you are working on spring boot with kafka and if you encounter with this error

org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value.

Make sure you have these things in place:

  1. spring.kafka.bootstrap-servers set this property in poperrty or yml file.
  2. Zookeeper and kafka server is running.
  3. Consumer is running by this command "kafka-console-consumer.bat/sh"(according to OS).
  4. spring.kafka.consumer.group-id needs to be set.
  5. spring.kafka.consumer.auto-offset-reset=earliest

This will help to someone.

Thanks,

Atul

Upvotes: 0

OneCricketeer
OneCricketeer

Reputation: 191844

That example hasn't been updated in over a year, but it appears you need to rename metadata.broker.list to bootstrap.servers, which is the property name all other Kafka clients use.

I'm not sure if run-example script passes the arguments correctly anyway, but you'll want to give the external IP or hostname of the Kafka broker(s), not localhost.

Also, Structured Streaming and the Dataframe API is recommended in Spark2+ over DStream and RDD

Upvotes: 1

Harneet Singh
Harneet Singh

Reputation: 2556

You need to add bootstrap.servers in kafka params because a consumer requires bootstrap servers to consume messages from any topic w.r.t. spark-streaming-kafka-0-10_2.11-2.3.1.jar.

val kafkaParams = Map[String, Object]("bootstrap.servers" -> "alpha-kafka-1.com:9092,alpha-kafka-2.com:9092,alpha-kafka-3.com:9092")

Resource: https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#creating-a-direct-stream

Upvotes: 1

Related Questions