Reputation: 1
I am using spring-boot & spring-kafka (see pom.xml for specific versions) with Apache Kafka v2.0.1 and I am facing a weird issue when using transactions via Listener Container.
The issue is raised when there is an error originated by Kafka while trying to publish new message on consume-process-produce cycle. To emulate it, I set topic's min.insync.replicas setting (4) greater than Kafka's available brokers (3).
I was expecting that due to factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<String, String>(-1));
, there would be transaction rollback on each unsuccessful attempt and infinite retries.
As you may see on the output.log
, when processing fails on the 1st try (message receival), the listener container initiates the transaction rollback and then it receives again the same message (as expected).
However, while trying for the 2nd time to process & publish the new message, it blocks forever on kafkaTemplate.send(record).get();
and no transaction rollback is initiated... By the way, at that point, if I set back my min.insync.replicas setting to an acceptable value (<=brokers), the process continues normally and the transaction is committed.
In order to emulate it, just publish a string message to the topic, increase topic's min.insync.replicas to unacceptable value and run the application.
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
</parent>
<groupId>com.project.test</groupId>
<artifactId>kafka-transactions</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Application.java
package com.project.test;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@EnableKafka
@EnableTransactionManagement
@SpringBootApplication
public class Application {
private String kafkaBootstrapServers = "127.0.0.1:9092";
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, "1");
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configProps);
producerFactory.setTransactionIdPrefix("tx.");
return producerFactory;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaTransactionManager<String, String> kafkaTransactionManager() {
KafkaTransactionManager<String, String> manager = new KafkaTransactionManager<String, String>(producerFactory());
return manager;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-kafka-tx-consumer");
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
configProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.getContainerProperties().setSyncCommits(true);
factory.getContainerProperties().setCommitLogLevel(org.springframework.kafka.support.LogIfLevelEnabled.Level.INFO);
factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<String, String>(-1));
return factory;
}
}
KafkaTransactions.java
package com.project.test;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaTransactions {
private static final Logger log = LoggerFactory.getLogger(KafkaTransactions.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = "test-kafka-transactions")
public void messageListener(String value) throws Exception {
log.info("Received message");
ProducerRecord<String, String> record = new ProducerRecord<>("test-kafka-transactions", null, value);
Thread.sleep(2000);
log.info("Adding new message on Kafka transaction (commit is handled by the Listener Container)");
kafkaTemplate.send(record).get();
log.info("Processed message");
}
}
output.log
2019-04-04 10:21:25.741 INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [test-kafka-transactions-3, test-kafka-transactions-2, test-kafka-transactions-1, test-kafka-transactions-0]
2019-04-04 10:23:03.240 INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.project.test.KafkaTransactions : Received message
2019-04-04 10:23:05.245 INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.project.test.KafkaTransactions : Adding new message on Kafka transaction (commit is handled by the Listener Container)
2019-04-04 10:23:05.247 INFO 19316 --- [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: gvDhAK6YRsWzh2FrxukHnA
2019-04-04 10:23:05.267 WARN 19316 --- [kafka-producer-network-thread | producer-1] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-1, transactionalId=tx.test-kafka-tx-consumer.test-kafka-transactions.3] Got error produce response with correlation id 12 on topic-partition test-kafka-transactions-3, retrying (0 attempts left). Error: NOT_ENOUGH_REPLICAS
2019-04-04 10:23:05.372 ERROR 19316 --- [kafka-producer-network-thread | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='1' and payload='1' to topic test-kafka-transactions:
org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
2019-04-04 10:23:05.372 ERROR 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.project.test.KafkaTransactions.messageListener(java.lang.String) throws java.lang.Exception' threw exception; nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:302) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1224) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1217) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1178) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1600(KafkaMessageListenerContainer.java:384) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$3.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1128) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36) ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1118) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:934) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:750) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_191]
at java.util.concurrent.FutureTask.get(FutureTask.java:192) [na:1.8.0_191]
at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:119) ~[spring-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at com.project.test.KafkaTransactions.messageListener(KafkaTransactions.java:29) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
... 17 common frames omitted
Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$0(KafkaTemplate.java:396) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) ~[kafka-clients-2.0.1.jar:na]
... 1 common frames omitted
Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
2019-04-04 10:23:05.372 INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.project.test.KafkaTransactions : Received message
2019-04-04 10:23:07.391 INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.project.test.KafkaTransactions : Adding new message on Kafka transaction (commit is handled by the Listener Container)
Upvotes: 0
Views: 2692
Reputation: 174689
I was able to reproduce it after taking Spring out of the picture.
I submitted a bug KAFKA-8195.
Upvotes: 1