Mestru
Mestru

Reputation: 427

How to retrieve a batch of kafka avro messages with message headers for acknowledgement using Spring Cloud Stream

I'm using spring-cloud-stream 4.0.2 with spring-cloud-stream-binder-kafka 4.0.2 and spring-cloud-stream-schema 2.2.1.RELEASE.

I'm trying to configure Spring Cloud Stream Consumer for Kafka Avro topic with confluent schema and manual AckMode. It works correctly when I'm trying to retrieve messages one by one (I get a Message object with deserialized avro object) but when consuming a whole batch, the payload is empty.

Working setup with messages one by one:
application.yml

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: <brokerURL>
          replication-factor: 1
          consumer-properties:
            schema.registry.url: <schemaRegistryURL>
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            specific.avro.reader: true
        bindings:
          input-in-0:
            consumer:
              ack-mode: MANUAL
      default:
        group: <consumerGroup>
      bindings:
        input-in-0:
          destination: <inputKafkaTopic>
          content-type: application/*+avro

MainConsumer.java

@Component
public class MainConsumer {

    @Bean
    public Consumer<Message<MyAvroObject>> input() {
        return event -> {
            log.info("Consumed event: {}", event.getPayload()); // THIS PROPERLY RETURNS DESERIALIZED PAYLOAD
            Acknowledgment ack = event.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
            ack.acknowledge();
        };
    }
}

Setup with batch processing that doesn't work correctly - Payload is empty.
application.yml

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: <brokerURL>
          replication-factor: 1
          consumer-properties:
            schema.registry.url: <schemaRegistryURL>
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            specific.avro.reader: true
            max-poll-records: 500
        bindings:
          input-in-0:
            consumer:
              ack-mode: MANUAL
      default:
        group: <consumerGroup>
      bindings:
        input-in-0:
          consumer:
            batch-mode: true
          destination: <inputKafkaTopic>
          content-type: application/*+avro

MainConsumer.java

@Component
public class MainConsumer {

    @Bean
    public Consumer<Message<List<MyAvroObject>>> input() {
        return event -> {
            log.info("Consumed event: {}", event.getPayload()); // THIS RETURNS EMPTY LIST
            Acknowledgment ack = event.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
            ack.acknowledge();
        };
    }
}

I also tried variation with Consumer<List<Message>> but that just creates a ClassCastException.

How to make it work with batch messages?

Upvotes: 2

Views: 522

Answers (1)

Mestru
Mestru

Reputation: 427

Turns out to make it work for batch processing, I needed to turn on native decoding in Spring Cloud Stream binding. Not sure why that is, so if anyone knows answer to that question, please comment :)

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: <brokerURL>
          replication-factor: 1
          consumer-properties:
            schema.registry.url: <schemaRegistryURL>
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            specific.avro.reader: true
            max-poll-records: 500
        bindings:
          input-in-0:
            consumer:
              ack-mode: MANUAL
      default:
        group: <consumerGroup>
      bindings:
        input-in-0:
          consumer:
            use-native-decoding: true
            batch-mode: true
          destination: <inputKafkaTopic>
          content-type: application/*+avro

Upvotes: 3

Related Questions