Abdul
Abdul

Reputation: 51

can we achieve exactly one processing of message using spring kafka batch listener?

I am trying to achieve exactly once processing of each message on kafka topic. Here is my configuration:

config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");    
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 25);
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 4096000);
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 120000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 600000);  
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 8192000);

I have set Acknolwedgement Mode as MANUAL & concurrency to 2.

Yet it consumes messages more than once. Has anyone faced this issue. Also, with above configuration, consumer always receives only one message in one batch. I have tried increasing fetch.min.bytes and fetch.max.wait.ms, but it doesn't have any impact.

The problem with batch configuration is resolved after I made change to ConcurrentKafkaListenerContainerFactory as followed:

ConcurrentKafkaListenerContainerFactory<String, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());

factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3600000);

factory.getContainerProperties().setAckMode(org.springframework.kafka.listner.ContainerProperties.AckMode.MANUAL); factory.setMessageConverter(new BatchMessagingMessageConverter(stringJsonMessageConverter()));

Upvotes: 2

Views: 932

Answers (2)

raksh93
raksh93

Reputation: 67

Kafka offers at least once processing by default. So, implementing idempotence in your consumer can also be effectively exactly-once. Lets say your system looks like

producer -> topic1 -> consumer1 -> topic2 -> consumer2 -> topic3 -> consumer3

Lets say the final processing is done in consumer3. Then even if the intermediate consumers process the message multiple times, adding idempotence in consumer3 would make sure each message is processed exactly once.

However, this assumes that it's okay to process the same message multiple times in the intermediate consumers. This approach might be easier in a simpler system with only a handful of consumers/topics. If the number of consumers increases, adding idempotence checks at each consumer could get cumbersome.

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174554

To get exactly once semantics, you have to use transactions.

However, exactly once semantics only applies to

read from Kafka -> process -> write to Kafka

and even then, it only applies to whole flow (read/process/write).

The read and process steps alone could be called multiple times for the same record (if the process or write fails). The only guarantee is that the whole thing will only be processed exactly once.

Upvotes: 1

Related Questions