Davide Icardi
Davide Icardi

Reputation: 12209

Akka Stream Kafka: No configuration setting found for key 'kafka-clients'

I'm trying to create a simple prototype using Alpakka Kafka connector (Akka Stream Kafka).

When running the application I get the following error:

com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'kafka-clients'

I have the following code in ./src/main/scala/App.scala:

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

import scala.concurrent.Future

object App {
  def main(args: Array[String]): Unit = {
    println("Hello from producer")

    implicit val system = ActorSystem("fakeProducer")
    implicit val materializer: Materializer = ActorMaterializer()

    val config = system.settings.config // ConfigFactory.load()

    val producerSettings =
      ProducerSettings(config, new StringSerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

    val done: Future[Done] =
      Source(1 to 100)
        .map(_.toString)
        .map(value => new ProducerRecord[String, String]("test-basic-numbers", value))
        .runWith(Producer.plainSink(producerSettings))


    println("Done")
  }
}

The following build.sbt:

name := "test-akka-stream"

version := "0.1"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"

I run the app using sbt run. I have not configured any uber/assembly jar.

I'm probably missing something obvious but I cannot see it ... I suspect there is some problem with akka dependencies.

UPDATE

As suggested by @terminally-chill calling ProducerSettings(system, new StringSerializer, new StringSerializer) (passing ActorSystem instead of a configuration) solve the problem. I just don't understand if this is by design or a bug.

UPDATE 2

I have create a github issue that is already been fixed. Now documentation is more accurate and explain the correct way to create the ProducerSettings/ConsumerSettings.

val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
      ProducerSettings(config, new StringSerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

or you can pass the ActorSystem as explained above.

Upvotes: 2

Views: 2650

Answers (4)

Enno
Enno

Reputation: 283

Thank you for noticing and filing an issue in the Alpakka Kafka connector project. The documentation is now updated: https://doc.akka.io/docs/akka-stream-kafka/current/producer.html

Upvotes: 2

Davide Icardi
Davide Icardi

Reputation: 12209

Thanks @terminally-chill and @murray-todd-williams for your answers. I have made some further researches and I try to summarize here:

Both ConsumerSettings and ProducerSettings have apply functions that take a Config (see here) or an ActorSystem (see here).

The problem is that when using ActorSystem the code is:

val config = system.settings.config.getConfig("akka.kafka.consumer")
apply(config, keyDeserializer, valueDeserializer) // call the other overload

while when using Config the code is:

val properties = ConfigSettings.parseKafkaClientsProperties(config.getConfig("kafka-clients"))

So when passing a config directly the code search for a kafka-clients property, instead when passing an ActorSystem the code check for a akka.kafka.consumer/akka.kafka.producer.

Finally consider that when creating an ActorSystem instance by default most of the settings are loaded from an embedded reference.conf file and merged with your application.conf file if present. More info here. So basically the only required property to set is usually the bootstrap.servers.

So you can now understand why when using system.settings.config the code doesn't work. This config instance has loaded reference.conf (with all the defaults, see here) and custom application.conf. The kafka-clients property is inside akka.kafka.consumer/akka.kafka.producer, but the code check directly for kafka-clients.

Some possible solutions:

  • Directly pass the ActorSystem using the other overload
  • Pass the right section using system.settings.config.getConfig("akka.kafka.consumer")
  • Manually construct a Config instance with the kafka-clients section

For me the problem is that the official documentation provided here doesn't mention these differences and the example provided are not complete and/or not precise. Probably for an Akka expert this is clear, but for new developers this can be very confusing.

I have created a more "ready to use" example in this gist and open an issue.

Upvotes: 3

Murray Todd Williams
Murray Todd Williams

Reputation: 331

I think I've run into the same issue you did (at almost the exact same time) although I'm trying to create a basic 'hello world' Kafka consumer instead of a producer. I'm guessing you were just going through the documentation in the Alpakka Kafka connector documentation and were following the example where they first define

val config = system.settings.config

and then pass it into the new ConsumerSettings object. I'm going to guess there's a defect with the online documentation, but I'm new enough to Akka Streams (this is my first attempt to learn by example) that I'm not qualified to figure out exactly what's right or wrong.

I had tried to create and application.conf file, and then do the ConfigFactory.load() and then manually pass that to the ActorSystem upon creation, and then I passed that system to the ConsumerSettings constructor, and the error about the missing "kafka-clients" went away, but apparently I didn't have to even do that. As you said, just passing the 'system' variable instead of the 'config' variable does the trick.

Hope this helps anyone who is similarly fumbling around in the dark. I've got to say, for all the buzz there is around Akka Streams, it seems like there's a real lack of documentation out there. I might have to write a blog article once I get this stuff figured out!

Upvotes: 1

emran
emran

Reputation: 922

Normally I keep all my config in the AkkaSystem instance. I don't use Alpakka for Kafka, but have based many of my implementations off the source code.

Load the typesafe config object with val config = ConfigFactory.load() and then pass config in to val system = ActorSystem("fakeProducer", config).

Finally, pass system.settings.config to ProducerSettings.

Your current code does not pass any settings, because you have not loaded the config in to your Akka system. Your val config = system.settings.config is referencing an empty config, which does not have a kafka-clients section (best guess).

Upvotes: 1

Related Questions