Reputation: 51
I am trying to achieve exactly once processing of each message on kafka topic. Here is my configuration:
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 25);
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 4096000);
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 120000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 600000);
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 8192000);
I have set Acknolwedgement Mode
as MANUAL & concurrency to 2.
Yet it consumes messages more than once. Has anyone faced this issue.
Also, with above configuration, consumer always receives only one message in one batch. I have tried increasing fetch.min.bytes
and fetch.max.wait.ms
, but it doesn't have any impact.
The problem with batch configuration is resolved after I made change to ConcurrentKafkaListenerContainerFactory as followed:
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3600000);
factory.getContainerProperties().setAckMode(org.springframework.kafka.listner.ContainerProperties.AckMode.MANUAL); factory.setMessageConverter(new BatchMessagingMessageConverter(stringJsonMessageConverter()));
Upvotes: 2
Views: 932
Reputation: 67
Kafka offers at least once processing by default. So, implementing idempotence in your consumer can also be effectively exactly-once. Lets say your system looks like
producer -> topic1 -> consumer1 -> topic2 -> consumer2 -> topic3 -> consumer3
Lets say the final processing is done in consumer3. Then even if the intermediate consumers process the message multiple times, adding idempotence in consumer3 would make sure each message is processed exactly once.
However, this assumes that it's okay to process the same message multiple times in the intermediate consumers. This approach might be easier in a simpler system with only a handful of consumers/topics. If the number of consumers increases, adding idempotence checks at each consumer could get cumbersome.
Upvotes: 0
Reputation: 174554
To get exactly once semantics, you have to use transactions.
However, exactly once semantics only applies to
read from Kafka -> process -> write to Kafka
and even then, it only applies to whole flow (read/process/write).
The read and process steps alone could be called multiple times for the same record (if the process or write fails). The only guarantee is that the whole thing will only be processed exactly once.
Upvotes: 1