Luka Špoljarić
Luka Špoljarić

Reputation: 999

Kafka consumer manual commit offset

I'm implementing and dsl spring integration flow which takes messages from Kafka

Code snippet:

return IntegrationFlows.from(
                Kafka.messageDrivenChannelAdapter(new DefaultKafkaConsumerFactory(kafkaTelemetryDataConsumerConfiguration.getConsumerProperties()),
                        kafkaPropertiesConfiguration.getTelemetryDataTopic()))
                })
                .handle(bla.someImportantOperation())
                //TODO:do manual commit here
                //.handle(consumer.commitSync())

                .get();

I would like to know how can I manually commitSync but only after .handle(bla.someImportantOperation()) has successfuly finished.

I don't know how to get the consumer reference since I'm using DefaultKafkaConsumerFactory, would appreciate any help.

These are my consumerProperties I use to create consumer:

consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPropertiesConfiguration.getBootstrapServers());
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

consumerProperties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, kafkaPropertiesConfiguration.getClientIdConfig());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaPropertiesConfiguration.getGroupIdConfig());

consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

Upvotes: 2

Views: 2178

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121262

The Kafka.messageDrivenChannelAdapter() provides for you a configurer hook:

.configureListenerContainer(c ->
                                c.ackMode(ContainerProperties.AckMode.MANUAL))

Pay attention to the option I provide.

Read its Javadocs and then AcknowledgingMessageListener. There is a mentioning of the Acknowledgment. this one is present in the message headers via KafkaHeaders.ACKNOWLEDGMENT.

so, what you need in your //.handle(consumer.commitSync()) is just something like this:

.handle(m -> headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge())

See more info in Spring for Apache Kafka Docs: https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#committing-offsets

Upvotes: 4

Related Questions