ritratt
ritratt

Reputation: 1858

How to reactively process kafka messages using Micronaut-Kafka

So I have a simple Kafka consumer using Micronaut that works:

@KafkaListener
public class ReactiveKafkaConsumer {

  private static final FluentLogger log = FluentLogger.forEnclosingClass();

  @Topic("test")
  public void consume(ConsumerRecord<String, byte[]> record) {
    log.atInfo().log(record.value());
  }
}

However, when I switch this to a reactive consumer like so:

@KafkaListener
public class ReactiveKafkaConsumer {

  private static final FluentLogger log = FluentLogger.forEnclosingClass();

  @Topic("test")
  public void consume(Flux<ConsumerRecord<String, byte[]>> recordFlux) {
    log.atInfo().log("Just log. Don't process anything...");

It immediately fails with:

[Consumer clientId=xxx] Value Deserializers with error: Deserializers{keyDeserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer@e966f7d, valueDeserializer=JsonObjectSerde: org.apache.kafka.clients.consumer.ConsumerRecord} ... No bean introspection available for type [class org.apache.kafka.clients.consumer.ConsumerRecord]. Ensure the class is annotated with io.micronaut.core.annotation.Introspected

I've looked through the docs and sample code for using reactive streams is pretty limited. Has anyone managed to effectively consume Kafka messages in a reactive manner?

Upvotes: 0

Views: 67

Answers (0)

Related Questions