nickorfas
nickorfas

Reputation: 1

Spring Kafka - Transactionally publish message and commit record offset via the Listener Container fails on retry after transaction rollback

I am using spring-boot & spring-kafka (see pom.xml for specific versions) with Apache Kafka v2.0.1 and I am facing a weird issue when using transactions via Listener Container.

The issue is raised when there is an error originated by Kafka while trying to publish new message on consume-process-produce cycle. To emulate it, I set topic's min.insync.replicas setting (4) greater than Kafka's available brokers (3).

I was expecting that due to factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<String, String>(-1));, there would be transaction rollback on each unsuccessful attempt and infinite retries.

As you may see on the output.log, when processing fails on the 1st try (message receival), the listener container initiates the transaction rollback and then it receives again the same message (as expected).

However, while trying for the 2nd time to process & publish the new message, it blocks forever on kafkaTemplate.send(record).get(); and no transaction rollback is initiated... By the way, at that point, if I set back my min.insync.replicas setting to an acceptable value (<=brokers), the process continues normally and the transaction is committed.

In order to emulate it, just publish a string message to the topic, increase topic's min.insync.replicas to unacceptable value and run the application.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
    </parent>
    <groupId>com.project.test</groupId>
    <artifactId>kafka-transactions</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Application.java

package com.project.test;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

@EnableKafka
@EnableTransactionManagement
@SpringBootApplication
public class Application {

    private String kafkaBootstrapServers = "127.0.0.1:9092";

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        configProps.put(ProducerConfig.RETRIES_CONFIG, "1");

        DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configProps);
        producerFactory.setTransactionIdPrefix("tx.");

        return producerFactory;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public KafkaTransactionManager<String, String> kafkaTransactionManager() {
        KafkaTransactionManager<String, String> manager = new KafkaTransactionManager<String, String>(producerFactory());
        return manager;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-kafka-tx-consumer");

        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        configProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(1);

        factory.getContainerProperties().setTransactionManager(kafkaTransactionManager());
        factory.getContainerProperties().setAckMode(AckMode.RECORD);
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setCommitLogLevel(org.springframework.kafka.support.LogIfLevelEnabled.Level.INFO);

        factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<String, String>(-1));

        return factory;
    }

}

KafkaTransactions.java

package com.project.test;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaTransactions {

    private static final Logger log = LoggerFactory.getLogger(KafkaTransactions.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @KafkaListener(topics = "test-kafka-transactions")
    public void messageListener(String value) throws Exception {

        log.info("Received message");

        ProducerRecord<String, String> record = new ProducerRecord<>("test-kafka-transactions", null, value);

        Thread.sleep(2000);

        log.info("Adding new message on Kafka transaction (commit is handled by the Listener Container)");
        kafkaTemplate.send(record).get();

        log.info("Processed message");
    }

}

output.log

2019-04-04 10:21:25.741  INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test-kafka-transactions-3, test-kafka-transactions-2, test-kafka-transactions-1, test-kafka-transactions-0]
2019-04-04 10:23:03.240  INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.project.test.KafkaTransactions       : Received message
2019-04-04 10:23:05.245  INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.project.test.KafkaTransactions       : Adding new message on Kafka transaction (commit is handled by the Listener Container)
2019-04-04 10:23:05.247  INFO 19316 --- [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: gvDhAK6YRsWzh2FrxukHnA
2019-04-04 10:23:05.267  WARN 19316 --- [kafka-producer-network-thread | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1, transactionalId=tx.test-kafka-tx-consumer.test-kafka-transactions.3] Got error produce response with correlation id 12 on topic-partition test-kafka-transactions-3, retrying (0 attempts left). Error: NOT_ENOUGH_REPLICAS
2019-04-04 10:23:05.372 ERROR 19316 --- [kafka-producer-network-thread | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='1' and payload='1' to topic test-kafka-transactions:

org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.

2019-04-04 10:23:05.372 ERROR 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.project.test.KafkaTransactions.messageListener(java.lang.String) throws java.lang.Exception' threw exception; nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:302) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1224) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1217) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1178) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1600(KafkaMessageListenerContainer.java:384) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$3.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1128) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36) ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1118) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:934) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:750) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
    at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_191]
    at java.util.concurrent.FutureTask.get(FutureTask.java:192) [na:1.8.0_191]
    at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:119) ~[spring-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at com.project.test.KafkaTransactions.messageListener(KafkaTransactions.java:29) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    ... 17 common frames omitted
Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
    at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$0(KafkaTemplate.java:396) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) ~[kafka-clients-2.0.1.jar:na]
    ... 1 common frames omitted
Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.

2019-04-04 10:23:05.372  INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.project.test.KafkaTransactions       : Received message
2019-04-04 10:23:07.391  INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.project.test.KafkaTransactions       : Adding new message on Kafka transaction (commit is handled by the Listener Container)

Upvotes: 0

Views: 2692

Answers (1)

Gary Russell
Gary Russell

Reputation: 174689

I was able to reproduce it after taking Spring out of the picture.

I submitted a bug KAFKA-8195.

Upvotes: 1

Related Questions