Reputation: 135
Tried different documentation but not able to implement kafka consumer api using lagom. Followed Message broker and getting Object creation impossible, since member lagomServer: LagomServer in com.lightbend.lagom.scaladsl.server.LagomServerComponents is not defined in loader class. Below is the code snippet of my loader class.
class ConsumerLoader extends LagomApplicationLoader {
override def load(context: LagomApplicationContext): LagomApplication =
new ConsumerApplication(context) with ConfigurationServiceLocatorComponents
override def describeService = Some(readDescriptor[ConsumerService])
}
abstract class ConsumerApplication(context: LagomApplicationContext)
extends LagomApplication(context)
with AhcWSComponents {
lazy val kafkaService = serviceClient.implement[ConsumerService]
}
Please provide me with the useful documentation link on how to implement kafka message consumer.
Upvotes: 0
Views: 538
Reputation: 1647
I did it following way:
When Service that read and write to the topic is different services or you need just implement reader:
def topic: Topic[Envelope]
into service readertrait ReaderKafkaService extends Service {
def topic1: Topic[Envelope]
override final def descriptor: Descriptor = {
named("kafka-reader")
.withTopics(
topic("topic-name", topic1)
)
.withAutoAcl(true)
}
}
ConsumerService extends Service {
override final def descriptor: Descriptor = {
named("consumer-service")
.withAutoAcl(true)
}
}
lazy val kafkaService: ReaderKafkaService = serviceClient.implement[ReaderKafkaService]
class ServiceImpl(
kafkaService: ReaderKafkaService,
) extends ConsumerService
class ServiceImpl(
kafkaService: ReaderKafkaService,
) extends ConsumerService {
kafkaService.topic1.subscribe
.withGroupId("group-1")
.atLeastOnce(
Flow[Envelope]
.mapAsync(1) {
case envelope: Envelope =>
println(s" Message from topic: $envelope")
Future.successful(Done)
}
.recover {
case e =>
log.error(s"Invalid message $e")
Done
}
)
}
kafka {
bootstrap.servers = "localhost:9092"
}
When you want to write and read from the same service:
trait ReaderWriterService extends Service {
def topic1: Topic[Envelope]
override final def descriptor: Descriptor = {
named("kafka-reader-writer")
.withTopics(
topic("topic-name", topic1)
)
.withAutoAcl(true)
}
}
class ServiceImpl(
kafkaService: ReaderWriterService,
) extends ReaderWriterService {
kafkaService.topic1.subscribe
.withGroupId("group-1")
.atLeastOnce(
Flow[Envelope]
.mapAsync(1) {
case envelope: String =>
println(s" Message from topic: $envelope")
Future.successful(Done)
}
.recover {
case e =>
log.error(s"Invalid message $e")
Done
}
)
override def topic1(): Topic[String] =
TopicProducer.singleStreamWithOffset { fromOffset =>
persistentEntityRegistry
.eventStream(Event.Tag, fromOffset)
.map(ev => ("Hi world", ev.offset))
}
}
with LagomKafkaComponents
and add this as service lazy val kafka: ProfileService = serviceClient.implement[ProfileService]
abstract class Application(context: LagomApplicationContext)
extends LagomApplication(context)
with CassandraPersistenceComponents
with LagomKafkaComponents
with AhcWSComponents {
override lazy val lagomServer = serverFor[ReaderWriterService](wire[ServiceImpl])
lazy val kafka: ReaderWriterService = serviceClient.implement[ReaderWriterService]
persistentEntityRegistry.register(wire[PersistentEntity])
}
Upvotes: 1
Reputation: 153
You can try to look at the documentation and the lagom github repository. AnotherServiceImpl contains logic of Kafka consumer. The appropriate loader is defined in AnotherServiceSpec.scala in AnotherApplication class.
Upvotes: 0