gadams00
gadams00

Reputation: 779

spring-cloud-stream producer transactionality

I've done a little testing with the kafka binder and it appears that spring-cloud-stream producers don't participate in spring-managed transactions.

Given code like

@RequestMapping(method = RequestMethod.POST)
    @Transactional
    public Customer insertCustomer(@RequestBody Customer customer) {
        customerDao.insertCustomer(customer);
        source.output().send(MessageBuilder.withPayload(CustomerEventHelper.createSaveEvent(customer)).build());
        if (true) {
            throw new RuntimeException("rollback test");
        }
        return customer;
    }

the customerDao.insertCustomer call is rolled back, but the kafka message was still sent. If I have a consumer on the customer event that inserts the customer into a data warehouse, the data warehouse and system of record will be out of synch on transation rollback. Is there a way to make the kafka binder transactional here?

Upvotes: 3

Views: 2203

Answers (1)

Marius Bogoevici
Marius Bogoevici

Reputation: 2400

The Kafka binder is not transactional, and Kafka does not support transactions in general.

We do intend to address transaction management for Spring Cloud Stream 1.1: https://github.com/spring-cloud/spring-cloud-stream/issues/536.

However, you can even currently send messages only after a successful commit by registering a transaction synchronization like this:

TransactionSynchronizationManager.registerSynchronization(
   new TransactionSynchronization(){
       void afterCommit(){                     
           source.output().send(MessageBuilder.withPayload(event).build());
    if (true) {

       }
});

See http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/transaction/support/TransactionSynchronization.html

Upvotes: 4

Related Questions