Reputation: 711
I'm using spring-boot (2.1.6.RELEASE) with spring-kafka (2.2.7.RELEASE) and I'm sending messages to my kafka cluster using KafkaTemplate. But sometimes (usually when I restart a kafka-broker or do a rebalance) I see errors like this when I'm sending messages :
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
Due to the default Kafka producer configs, I expect send failures to be retried but they are not. Default Kafka producer configs:
retries: 2147483647 (https://kafka.apache.org/documentation/#retries)
acks: 1 (https://kafka.apache.org/documentation/#acks)
My config is this:
@Bean
public Map<String, Object> producerConfigs()
{
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
return props;
}
@Bean
public ProducerFactory<Long, String> producerFactory()
{
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<Long, String> kafkaTemplate(KafkaTemplateProducerListener<Long, String> kafkaTemplateProducerListener,
ProducerFactory<Long, String> producerFactory)
{
KafkaTemplate<Long, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setProducerListener(kafkaTemplateProducerListener);
return kafkaTemplate;
}
and I'm sending messages like this:
kafkaTemplate.send(topicName, key, body);
I have searched all over the internet and everyone says that this configuration with the retries and acks must work but it doesn't. What I'm missing ?
Thanks
Upvotes: 3
Views: 2282
Reputation: 711
After spend some time debugging this I found the solution:
props.put(ProducerConfig.ACKS_CONFIG, "all");
For more information about this property: https://kafka.apache.org/documentation/#acks
Very good blog showing different scenarios that you can lose messages in kafka:
Side note - from this answer I found out that it is good idea to use this if you don't want to lose messages on shutdown:
@PreDestroy
public void flush()
{
kafkaTemplate.flush();
}
Upvotes: 1