gshock
gshock

Reputation: 711

Java Spring Kafka Template producer lost messages on broker restart

I'm using spring-boot (2.1.6.RELEASE) with spring-kafka (2.2.7.RELEASE) and I'm sending messages to my kafka cluster using KafkaTemplate. But sometimes (usually when I restart a kafka-broker or do a rebalance) I see errors like this when I'm sending messages :

org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

Due to the default Kafka producer configs, I expect send failures to be retried but they are not. Default Kafka producer configs:

retries: 2147483647  (https://kafka.apache.org/documentation/#retries)
acks: 1               (https://kafka.apache.org/documentation/#acks)

My config is this:

@Bean
    public Map<String, Object> producerConfigs()
    {
        // See https://kafka.apache.org/documentation/#producerconfigs for more properties
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        return props;
    }

    @Bean
    public ProducerFactory<Long, String> producerFactory()
    {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<Long, String> kafkaTemplate(KafkaTemplateProducerListener<Long, String> kafkaTemplateProducerListener,
                                                     ProducerFactory<Long, String> producerFactory)
    {
        KafkaTemplate<Long, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
        kafkaTemplate.setProducerListener(kafkaTemplateProducerListener);
        return kafkaTemplate;
    }

and I'm sending messages like this:

kafkaTemplate.send(topicName, key, body);

I have searched all over the internet and everyone says that this configuration with the retries and acks must work but it doesn't. What I'm missing ?

Thanks

Upvotes: 3

Views: 2282

Answers (1)

gshock
gshock

Reputation: 711

After spend some time debugging this I found the solution:

props.put(ProducerConfig.ACKS_CONFIG, "all");

For more information about this property: https://kafka.apache.org/documentation/#acks


Very good blog showing different scenarios that you can lose messages in kafka:



Side note - from this answer I found out that it is good idea to use this if you don't want to lose messages on shutdown:

@PreDestroy
public void flush()
{
    kafkaTemplate.flush();
}

Upvotes: 1

Related Questions