Reputation: 779
I've done a little testing with the kafka binder and it appears that spring-cloud-stream producers don't participate in spring-managed transactions.
Given code like
@RequestMapping(method = RequestMethod.POST)
@Transactional
public Customer insertCustomer(@RequestBody Customer customer) {
customerDao.insertCustomer(customer);
source.output().send(MessageBuilder.withPayload(CustomerEventHelper.createSaveEvent(customer)).build());
if (true) {
throw new RuntimeException("rollback test");
}
return customer;
}
the customerDao.insertCustomer call is rolled back, but the kafka message was still sent. If I have a consumer on the customer event that inserts the customer into a data warehouse, the data warehouse and system of record will be out of synch on transation rollback. Is there a way to make the kafka binder transactional here?
Upvotes: 3
Views: 2203
Reputation: 2400
The Kafka binder is not transactional, and Kafka does not support transactions in general.
We do intend to address transaction management for Spring Cloud Stream 1.1: https://github.com/spring-cloud/spring-cloud-stream/issues/536.
However, you can even currently send messages only after a successful commit by registering a transaction synchronization like this:
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization(){
void afterCommit(){
source.output().send(MessageBuilder.withPayload(event).build());
if (true) {
}
});
Upvotes: 4