
Reputation: 1797

Spring Kafka ChainedKafkaTransactionManager doesn't synchronize with JPA Spring-data transaction

I read a ton of Gary Russell answers and posts, but didn't find actual solution for the common use-case for synchronization of the sequence below:

recieve from topic A => save to DB via Spring-data => send to topic B

As i understand properly: there is no guarantee for fully atomic processing in that case and i need to deal with messages deduplication on the client side, but the main issue is that ChainedKafkaTransactionManager doesn't synchronize with JpaTransactionManager (see @KafkaListener below)

Kafka config:

public class KafkaConfig {

    private static final Logger log = LoggerFactory.getLogger(KafkaConfig.class);

    public ConsumerFactory<String, byte[]> commonConsumerFactory(@Value("${kafka.broker}") String bootstrapServer) {

        Map<String, Object> props = new HashMap<>();
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);

        props.put(AUTO_OFFSET_RESET_CONFIG, 'earliest');
        props.put(SESSION_TIMEOUT_MS_CONFIG, 10000);
        props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(MAX_POLL_RECORDS_CONFIG, 10);
        props.put(MAX_POLL_INTERVAL_MS_CONFIG, 17000);
        props.put(FETCH_MIN_BYTES_CONFIG, 1048576);
        props.put(FETCH_MAX_WAIT_MS_CONFIG, 1000);
        props.put(ISOLATION_LEVEL_CONFIG, 'read_committed');

        props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(props);

    public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory(
            @Qualifier("commonConsumerFactory") ConsumerFactory<String, byte[]> consumerFactory,
            @Qualifier("chainedKafkaTM") ChainedKafkaTransactionManager chainedKafkaTM,
            @Qualifier("kafkaTemplate") KafkaTemplate<String, byte[]> kafkaTemplate,
            @Value("${kafka.concurrency:#{T(java.lang.Runtime).getRuntime().availableProcessors()}}") Integer concurrency
    ) {

        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();

        var arbp = new DefaultAfterRollbackProcessor<String, byte[]>(new FixedBackOff(1000L, 3));



        return factory;

    public ProducerFactory<String, byte[]> producerFactory(@Value("${kafka.broker}") String bootstrapServer) {

        Map<String, Object> configProps = new HashMap<>();

        configProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);

        configProps.put(BATCH_SIZE_CONFIG, 16384);
        configProps.put(ENABLE_IDEMPOTENCE_CONFIG, true);

        configProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

        var kafkaProducerFactory = new DefaultKafkaProducerFactory<String, byte[]>(configProps);

        return kafkaProducerFactory;

    public KafkaTemplate<String, byte[]> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) {
        return new KafkaTemplate<>(producerFactory);

    public KafkaTransactionManager kafkaTransactionManager(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) {
        KafkaTransactionManager ktm = new KafkaTransactionManager<>(producerFactory);
        return ktm;

    public ChainedKafkaTransactionManager chainedKafkaTM(JpaTransactionManager jpaTransactionManager,
                                                         KafkaTransactionManager kafkaTransactionManager) {
        return new ChainedKafkaTransactionManager(kafkaTransactionManager, jpaTransactionManager);

    @Bean(name = "transactionManager")
    public JpaTransactionManager transactionManager(EntityManagerFactory em) {
        return new JpaTransactionManager(em);

Kafka listener:

@KafkaListener(groupId = "${group.id}", idIsGroup = false, topics = "${topic.name.import}")
public void consume(List<byte[]> records, @Header(KafkaHeaders.OFFSET) Long offset) {
    for (byte[] record : records) {
        // cause infinity rollback (perhaps due to batch listener)
        if (true)
            throw new RuntimeExcetion("foo");

        // spring-data storage with @Transactional("chainedKafkaTM"), since Spring-data can't determine TM among transactionManager, chainedKafkaTM, kafkaTransactionManager
        var result = storageService.persist(record);


Spring-kafka version: 2.3.3 Spring-boot version: 2.2.1

What is a proper way to implement such use-case ? Spring-kafka documentation limited only to small/specific examples.

P.s. when i'm using @Transactional(transactionManager = "chainedKafkaTM", rollbackFor = Exception.class) on @KafkaListener method i am facing endless cyclic rollback, however FixedBackOff(1000L, 3L) is set.

EDIT: i'm planning to achieve max affordable synchronization between listener, producer and database with configurable retries num.

EDIT: Code snippets above edited with respect to advised configuration. Using ARBP doesn't solve infinity rollback cycle for me, since the first statement's predicate is always false (SeekUtils.doSeeks):

    public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
            boolean recoverable) {

        if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
                getSkipPredicate((List) records, exception), LOGGER)
                    && isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
            ConsumerRecord<K, V> skipped = records.get(0);
                    Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
                            new OffsetAndMetadata(skipped.offset() + 1)));

It is worth saying that there is no active transaction in Kafka Consumer method (TransactionSynchronizationManager.isActualTransactionActive()).

Upvotes: 4

Views: 5372

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

What makes you think it's not synchronized? You really don't need @Transactional since the container will start both transactions.

You shouldn't use a SeekToCurrentErrorHandler with transactions because that occurs within the transaction. Configure the after rollback processor instead. The default ARBP uses a FixedBackOff(0L, 9) (10 attempts).

This works fine for me; and stops after 4 delivery attempts:

public class So58804826Application {

    public static void main(String[] args) {
        SpringApplication.run(So58804826Application.class, args);

    public JpaTransactionManager transactionManager() {
        return new JpaTransactionManager();

    public ChainedKafkaTransactionManager<?, ?> chainedTxM(JpaTransactionManager jpa,
            KafkaTransactionManager<?, ?> kafka) {

        return new ChainedKafkaTransactionManager<>(kafka, jpa);

    private Saver saver;

    @KafkaListener(id = "so58804826", topics = "so58804826")
    public void listen(String in) {
        System.out.println("Storing: " + in);

    public NewTopic topic() {
        return TopicBuilder.name("so58804826")

    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
//          template.executeInTransaction(t -> t.send("so58804826", "foo"));


class ContainerFactoryConfigurer {

    ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
            ChainedKafkaTransactionManager<?, ?> tm) {

        factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(1000L, 3)));


class Saver {

    private MyEntityRepo repo;

    private final AtomicInteger ids = new AtomicInteger();

    public void save(String in) {
        this.repo.save(new MyEntity(in, this.ids.incrementAndGet()));
        throw new RuntimeException("foo");


I see "Participating in existing transaction" from both TxMs.

and with @Transactional("transactionManager"), I just get it from the JPATm, as one would expect.


There is no concept of "recovery" for a batch listener - the framework has no idea which record in the batch needs to be skipped. In 2.3, we added a new feature for batch listeners when using MANUAL ack modes.

See Committing Offsets.

Starting with version 2.3, the Acknowledgment interface has two additional methods nack(long sleep) and nack(int index, long sleep). The first one is used with a record listener, the second with a batch listener. Calling the wrong method for your listener type will throw an IllegalStateException.

When using a batch listener, you can specify the index within the batch where the failure occurred. When nack() is called, offsets will be committed for records before the index and seeks are performed on the partitions for the failed and discarded records so that they will be redelivered on the next poll(). This is an improvement over the SeekToCurrentBatchErrorHandler, which can only seek the entire batch for redelivery.

However, the failed record will still be replayed indefinitely.

You could keep track of the record that keeps failing and nack index + 1 to skip over it.

However, since your JPA tx has rolled back; this won't work for you.

With batch listener's you must handle problems with batches in your listener code.

Upvotes: 4

Related Questions