riteshg
riteshg

Reputation: 31

RabbitTransactionManager not rolling back at ChainedTransactionManager when an error occurs

I'm trying to use one transaction manager (ChainedTransactionManager) for Rabbit and Kafka, chaining RabbitTransactionManager and KafkaTransactionManager. We intend to achieve a Best effort 1-phase commit.

To test it, the transactional method throws an exception after the 2 operations (sending a message to a Rabbit exchange and publishing and event in Kafka). When running the test, the logs suggest a rollback is initiated but the message ends up in Rabbit anyway.

Here's the method where the problem occurs:

    @Transactional
public void processMessageAndEvent() {
    Message<String> message = MessageBuilder
            .withPayload("Message to RabbitMQ")
            .build();
    outputToRabbitMQExchange.output().send(message);

    outputToKafkaTopic.output().send(
            withPayload("Message to Kafka")
                    .setHeader(KafkaHeaders.MESSAGE_KEY, "Kafka message key")
                    .build()
    );

    throw new RuntimeException("We want the previous changes to rollback");
}

Here is the main Spring-boot application configuration:

    @SpringBootApplication
**@EnableTransactionManagement**
public class MyApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
}

Here is TransactionManager configuration:

    @Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
    return new RabbitTransactionManager(cf);
}

@Bean(name = "transactionManager")
@Primary
public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm, BinderFactory binders) {
    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka", MessageChannel.class))
            .getTransactionalProducerFactory();
    KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
    ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return new ChainedKafkaTransactionManager<>(ktm, rtm);
}

And finally, the relevant configuration in the application.yml file:

spring:
  application:
    name: my-application
  main:
    allow-bean-definition-overriding: true
  cloud:
    stream:
      bindings:
        source_outputToRabbitMQExchange:
          content-type: application/json
          destination: outputToRabbitMQExchange
          group: ${spring.application.name}
        sink_outputToKafkaTopic:
          content-type: application/json
          destination: outputToKafkaTopic
          binder: kafka
      rabbit:
        bindings:
          output_outputToRabbitMQExchange:
            producer:
              transacted: true
              routing-key-expression: headers.myKey
      kafka:
        bindings:
          sink_outputToKafkaTopic:
            producer:
              transacted: true
        binder:
          brokers: ${...kafka.hostname}
          transaction:
            transaction-id-prefix: ${CF_INSTANCE_INDEX}.${spring.application.name}.T
      default-binder: rabbit

  kafka:
    producer:
      properties:
        max.block.ms: 3000
        transaction.timeout.ms: 5000
        enable.idempotence: true
        retries: 1
        acks: all
    bootstrap-servers: ${...kafka.hostname}

When we execute the method, we can see the message is still in Rabbit despite the logs saying the transaction is to be rolled back.

Anything we could be missing or misunderstood?

Upvotes: 0

Views: 356

Answers (1)

Gary Russell
Gary Russell

Reputation: 174719

@EnableBinding is deprecated in favor of the newer functional programming model.

That said, I copied your code/config pretty-much as-is (transacted is not a kafka producer binding property) and it works fine for me (Boot 2.4.5, cloud 2020.0.2)...

@SpringBootApplication
@EnableTransactionManagement
@EnableBinding(Bindings.class)
public class So67297869Application {

    public static void main(String[] args) {
        SpringApplication.run(So67297869Application.class, args);
    }

    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
        return new RabbitTransactionManager(cf);
    }

    @Bean(name = "transactionManager")
    @Primary
    public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm, BinderFactory binders) {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",
                MessageChannel.class))
                        .getTransactionalProducerFactory();
        KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return new ChainedKafkaTransactionManager<>(ktm, rtm);
    }

    @Bean
    public ApplicationRunner runner(Foo foo) {
        return args -> {
            foo.send("test");
        };
    }

}

interface Bindings {

    @Output("source_outputToRabbitMQExchange")
    MessageChannel rabbitOut();

    @Output("sink_outputToKafkaTopic")
    MessageChannel kafkaOut();

}

@Component
class Foo {

    @Autowired
    Bindings bindings;

    @Transactional
    public void send(String in) {
        bindings.rabbitOut().send(MessageBuilder.withPayload(in)
                .setHeader("myKey", "test")
                .build());
        bindings.kafkaOut().send(MessageBuilder.withPayload(in)
                .setHeader(KafkaHeaders.MESSAGE_KEY, "test".getBytes())
                .build());
        throw new RuntimeException("fail");
    }

}
spring:
  application:
    name: my-application
  main:
    allow-bean-definition-overriding: true
  cloud:
    stream:
      bindings:
        source_outputToRabbitMQExchange:
          content-type: application/json
          destination: outputToRabbitMQExchange
          group: ${spring.application.name}
        sink_outputToKafkaTopic:
          content-type: application/json
          destination: outputToKafkaTopic
          binder: kafka
      rabbit:
        bindings:
          source_outputToRabbitMQExchange:
            producer:
              transacted: true
              routing-key-expression: headers.myKey
      kafka:
        binder:
          brokers: localhost:9092
          transaction:
            transaction-id-prefix: foo.${spring.application.name}.T
      default-binder: rabbit

  kafka:
    producer:
      properties:
        max.block.ms: 3000
        transaction.timeout.ms: 5000
        enable.idempotence: true
        retries: 1
        acks: all
    bootstrap-servers: localhost:9092

logging:
  level:
    org.springframework.transaction: debug
    org.springframework.kafka: debug
    org.springframework.amqp.rabbit: debug
2021-04-28 09:35:32.488 DEBUG 53253 --- [           main] o.s.a.r.t.RabbitTransactionManager       : Initiating transaction rollback
2021-04-28 09:35:32.489 DEBUG 53253 --- [           main] o.s.a.r.connection.RabbitResourceHolder  : Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), conn: Proxy@3c770db4 Shared Rabbit Connection: SimpleConnection@1f736d00 [delegate=amqp://[email protected]:5672/, localPort= 63439]
2021-04-28 09:35:32.490 DEBUG 53253 --- [           main] o.s.a.r.t.RabbitTransactionManager       : Resuming suspended transaction after completion of inner transaction
2021-04-28 09:35:32.490 DEBUG 53253 --- [           main] o.s.k.t.KafkaTransactionManager          : Initiating transaction rollback
2021-04-28 09:35:32.490 DEBUG 53253 --- [           main] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@38e83838] abortTransaction()

And there is no message in the queue that I bound to the exchange with RK #.

What versions are you using?

EDIT

And here is the equivalent app after removing the deprecations, using the functional model and StreamBridge (same yaml):

@SpringBootApplication
@EnableTransactionManagement
public class So67297869Application {

    public static void main(String[] args) {
        SpringApplication.run(So67297869Application.class, args);
    }

    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
        return new RabbitTransactionManager(cf);
    }

    @Bean(name = "transactionManager")
    @Primary
    public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm, BinderFactory binders) {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",
                MessageChannel.class))
                        .getTransactionalProducerFactory();
        KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return new ChainedKafkaTransactionManager<>(ktm, rtm);
    }

    @Bean
    public ApplicationRunner runner(Foo foo) {
        return args -> {
            foo.send("test");
        };
    }

}

@Component
class Foo {

    @Autowired
    StreamBridge bridge;

    @Transactional
    public void send(String in) {
        bridge.send("source_outputToRabbitMQExchange", MessageBuilder.withPayload(in)
                .setHeader("myKey", "test")
                .build());
        bridge.send("sink_outputToKafkaTopic", MessageBuilder.withPayload(in)
                .setHeader(KafkaHeaders.MESSAGE_KEY, "test".getBytes())
                .build());
        throw new RuntimeException("fail");
    }

}

Upvotes: 1

Related Questions