Manish Kumar
Manish Kumar

Reputation: 135

How to implement kafka consumer using lagom framework

Tried different documentation but not able to implement kafka consumer api using lagom. Followed Message broker and getting Object creation impossible, since member lagomServer: LagomServer in com.lightbend.lagom.scaladsl.server.LagomServerComponents is not defined in loader class. Below is the code snippet of my loader class.

class ConsumerLoader extends LagomApplicationLoader {

  override def load(context: LagomApplicationContext): LagomApplication =
    new ConsumerApplication(context) with ConfigurationServiceLocatorComponents

  override def describeService = Some(readDescriptor[ConsumerService])
}

abstract class ConsumerApplication(context: LagomApplicationContext)
  extends LagomApplication(context)
    with AhcWSComponents {

  lazy val kafkaService = serviceClient.implement[ConsumerService]
}

Please provide me with the useful documentation link on how to implement kafka message consumer.

Upvotes: 0

Views: 538

Answers (2)

Vladislav Kievski
Vladislav Kievski

Reputation: 1647

I did it following way:

When Service that read and write to the topic is different services or you need just implement reader:

  1. Add topic method def topic: Topic[Envelope] into service reader
trait ReaderKafkaService extends Service {
  def topic1: Topic[Envelope]

  override final def descriptor: Descriptor = {

    named("kafka-reader")
      .withTopics(
        topic("topic-name", topic1)
      )
      .withAutoAcl(true)
  }
}
  1. Service where you what to read from kafka:
ConsumerService extends Service {
  override final def descriptor: Descriptor = {

    named("consumer-service")
      .withAutoAcl(true)
  }
}
  1. Add in Loader this service:
lazy val kafkaService: ReaderKafkaService = serviceClient.implement[ReaderKafkaService]
  1. Inject created service in your impl:

class ServiceImpl(
    kafkaService: ReaderKafkaService,
) extends ConsumerService
  1. Subscribe to topic
class ServiceImpl(
    kafkaService: ReaderKafkaService,
) extends ConsumerService {

  kafkaService.topic1.subscribe
    .withGroupId("group-1")
    .atLeastOnce(
      Flow[Envelope]
        .mapAsync(1) {
          case envelope: Envelope =>
              println(s" Message from topic: $envelope") 
              Future.successful(Done)
        }
        .recover {
          case e =>
            log.error(s"Invalid message $e")
            Done
        }
    )
}
  1. If needed add in config configuration for kafka
kafka {
  bootstrap.servers = "localhost:9092"
}

When you want to write and read from the same service:

  1. Add service method:
trait ReaderWriterService extends Service {
  def topic1: Topic[Envelope]

  override final def descriptor: Descriptor = {

    named("kafka-reader-writer")
      .withTopics(
        topic("topic-name", topic1)
      )
      .withAutoAcl(true)
  }
}
  1. Service impl:
class ServiceImpl(
    kafkaService: ReaderWriterService,
) extends ReaderWriterService {

  kafkaService.topic1.subscribe
    .withGroupId("group-1")
    .atLeastOnce(
      Flow[Envelope]
        .mapAsync(1) {
          case envelope: String =>
              println(s" Message from topic: $envelope") 
              Future.successful(Done)
        }
        .recover {
          case e =>
            log.error(s"Invalid message $e")
            Done
        }
    )

  override def topic1(): Topic[String] =
    TopicProducer.singleStreamWithOffset { fromOffset =>
      persistentEntityRegistry
        .eventStream(Event.Tag, fromOffset)
        .map(ev => ("Hi world", ev.offset))
    }
}
  1. In the loader you need to extend with LagomKafkaComponents and add this as service lazy val kafka: ProfileService = serviceClient.implement[ProfileService]
abstract class Application(context: LagomApplicationContext)
  extends LagomApplication(context)
    with CassandraPersistenceComponents
    with LagomKafkaComponents
    with AhcWSComponents {

  override lazy val lagomServer = serverFor[ReaderWriterService](wire[ServiceImpl])

  lazy val kafka: ReaderWriterService = serviceClient.implement[ReaderWriterService]

  persistentEntityRegistry.register(wire[PersistentEntity])
}

Upvotes: 1

Vadzim Marchanka
Vadzim Marchanka

Reputation: 153

You can try to look at the documentation and the lagom github repository. AnotherServiceImpl contains logic of Kafka consumer. The appropriate loader is defined in AnotherServiceSpec.scala in AnotherApplication class.

Upvotes: 0

Related Questions