Reputation: 427
I'm using spring-cloud-stream 4.0.2 with spring-cloud-stream-binder-kafka 4.0.2 and spring-cloud-stream-schema 2.2.1.RELEASE.
I'm trying to configure Spring Cloud Stream Consumer for Kafka Avro topic with confluent schema and manual AckMode. It works correctly when I'm trying to retrieve messages one by one (I get a Message object with deserialized avro object) but when consuming a whole batch, the payload is empty.
Working setup with messages one by one:
application.yml
spring:
cloud:
stream:
kafka:
binder:
brokers: <brokerURL>
replication-factor: 1
consumer-properties:
schema.registry.url: <schemaRegistryURL>
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific.avro.reader: true
bindings:
input-in-0:
consumer:
ack-mode: MANUAL
default:
group: <consumerGroup>
bindings:
input-in-0:
destination: <inputKafkaTopic>
content-type: application/*+avro
MainConsumer.java
@Component
public class MainConsumer {
@Bean
public Consumer<Message<MyAvroObject>> input() {
return event -> {
log.info("Consumed event: {}", event.getPayload()); // THIS PROPERLY RETURNS DESERIALIZED PAYLOAD
Acknowledgment ack = event.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
ack.acknowledge();
};
}
}
Setup with batch processing that doesn't work correctly - Payload is empty.
application.yml
spring:
cloud:
stream:
kafka:
binder:
brokers: <brokerURL>
replication-factor: 1
consumer-properties:
schema.registry.url: <schemaRegistryURL>
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific.avro.reader: true
max-poll-records: 500
bindings:
input-in-0:
consumer:
ack-mode: MANUAL
default:
group: <consumerGroup>
bindings:
input-in-0:
consumer:
batch-mode: true
destination: <inputKafkaTopic>
content-type: application/*+avro
MainConsumer.java
@Component
public class MainConsumer {
@Bean
public Consumer<Message<List<MyAvroObject>>> input() {
return event -> {
log.info("Consumed event: {}", event.getPayload()); // THIS RETURNS EMPTY LIST
Acknowledgment ack = event.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
ack.acknowledge();
};
}
}
I also tried variation with Consumer<List<Message>> but that just creates a ClassCastException.
How to make it work with batch messages?
Upvotes: 2
Views: 522
Reputation: 427
Turns out to make it work for batch processing, I needed to turn on native decoding in Spring Cloud Stream binding. Not sure why that is, so if anyone knows answer to that question, please comment :)
spring:
cloud:
stream:
kafka:
binder:
brokers: <brokerURL>
replication-factor: 1
consumer-properties:
schema.registry.url: <schemaRegistryURL>
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific.avro.reader: true
max-poll-records: 500
bindings:
input-in-0:
consumer:
ack-mode: MANUAL
default:
group: <consumerGroup>
bindings:
input-in-0:
consumer:
use-native-decoding: true
batch-mode: true
destination: <inputKafkaTopic>
content-type: application/*+avro
Upvotes: 3