Reputation: 12209
I'm trying to create a simple prototype using Alpakka Kafka connector (Akka Stream Kafka).
When running the application I get the following error:
com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'kafka-clients'
I have the following code in ./src/main/scala/App.scala
:
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import scala.concurrent.Future
object App {
def main(args: Array[String]): Unit = {
println("Hello from producer")
implicit val system = ActorSystem("fakeProducer")
implicit val materializer: Materializer = ActorMaterializer()
val config = system.settings.config // ConfigFactory.load()
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val done: Future[Done] =
Source(1 to 100)
.map(_.toString)
.map(value => new ProducerRecord[String, String]("test-basic-numbers", value))
.runWith(Producer.plainSink(producerSettings))
println("Done")
}
}
The following build.sbt
:
name := "test-akka-stream"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"
I run the app using sbt run
. I have not configured any uber/assembly jar.
I'm probably missing something obvious but I cannot see it ... I suspect there is some problem with akka dependencies.
As suggested by @terminally-chill calling ProducerSettings(system, new StringSerializer, new StringSerializer)
(passing ActorSystem
instead of a configuration) solve the problem. I just don't understand if this is by design or a bug.
I have create a github issue that is already been fixed. Now documentation is more accurate and explain the correct way to create the ProducerSettings
/ConsumerSettings
.
val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
or you can pass the ActorSystem
as explained above.
Upvotes: 2
Views: 2650
Reputation: 283
Thank you for noticing and filing an issue in the Alpakka Kafka connector project. The documentation is now updated: https://doc.akka.io/docs/akka-stream-kafka/current/producer.html
Upvotes: 2
Reputation: 12209
Thanks @terminally-chill and @murray-todd-williams for your answers. I have made some further researches and I try to summarize here:
Both ConsumerSettings
and ProducerSettings
have apply
functions that take a Config
(see here) or an ActorSystem
(see here).
The problem is that when using ActorSystem
the code is:
val config = system.settings.config.getConfig("akka.kafka.consumer")
apply(config, keyDeserializer, valueDeserializer) // call the other overload
while when using Config
the code is:
val properties = ConfigSettings.parseKafkaClientsProperties(config.getConfig("kafka-clients"))
So when passing a config directly the code search for a kafka-clients
property, instead when passing an ActorSystem
the code check for a akka.kafka.consumer/akka.kafka.producer
.
Finally consider that when creating an ActorSystem
instance by default most of the settings are loaded from an embedded reference.conf
file and merged with your application.conf
file if present. More info here.
So basically the only required property to set is usually the bootstrap.servers
.
So you can now understand why when using system.settings.config
the code doesn't work. This config instance has loaded reference.conf
(with all the defaults, see here) and custom application.conf
. The kafka-clients
property is inside akka.kafka.consumer/akka.kafka.producer
, but the code check directly for kafka-clients
.
Some possible solutions:
ActorSystem
using the other overloadsystem.settings.config.getConfig("akka.kafka.consumer")
Config
instance with the kafka-clients
sectionFor me the problem is that the official documentation provided here doesn't mention these differences and the example provided are not complete and/or not precise. Probably for an Akka expert this is clear, but for new developers this can be very confusing.
I have created a more "ready to use" example in this gist and open an issue.
Upvotes: 3
Reputation: 331
I think I've run into the same issue you did (at almost the exact same time) although I'm trying to create a basic 'hello world' Kafka consumer instead of a producer. I'm guessing you were just going through the documentation in the Alpakka Kafka connector documentation and were following the example where they first define
val config = system.settings.config
and then pass it into the new ConsumerSettings object. I'm going to guess there's a defect with the online documentation, but I'm new enough to Akka Streams (this is my first attempt to learn by example) that I'm not qualified to figure out exactly what's right or wrong.
I had tried to create and application.conf file, and then do the ConfigFactory.load() and then manually pass that to the ActorSystem upon creation, and then I passed that system to the ConsumerSettings constructor, and the error about the missing "kafka-clients" went away, but apparently I didn't have to even do that. As you said, just passing the 'system' variable instead of the 'config' variable does the trick.
Hope this helps anyone who is similarly fumbling around in the dark. I've got to say, for all the buzz there is around Akka Streams, it seems like there's a real lack of documentation out there. I might have to write a blog article once I get this stuff figured out!
Upvotes: 1
Reputation: 922
Normally I keep all my config in the AkkaSystem instance. I don't use Alpakka for Kafka, but have based many of my implementations off the source code.
Load the typesafe config object with val config = ConfigFactory.load()
and then pass config
in to val system = ActorSystem("fakeProducer", config)
.
Finally, pass system.settings.config
to ProducerSettings
.
Your current code does not pass any settings, because you have not loaded the config in to your Akka system. Your val config = system.settings.config
is referencing an empty config, which does not have a kafka-clients section (best guess).
Upvotes: 1