Reputation: 31
I am trying to wrap my head arround the following issue:
TL;DR How can I make sure a rollback occurs when connection is lost to the queue manager during the prepare of the 2PC.
==========================================================
However, the message is not rolled back. I don’t see any transaction logs (I believe this is to be expected since the prepare did not finish). No uncommitted messages on the queue or back-out queue.
If I put a breakpoint in AbstractPollingMessageListenerContainer.receiveAndExecute after receiving the message but before the transaction is commited I can see that the message is no longer on the queue. So it appears if the session.commit has already happened. How can I make sure a rollback occurs when connection is lost to the queue manager during the prepare of the 2PC. I am probably missing something here but I can't seem what.
Upvotes: 1
Views: 793
Reputation: 31
After some more digging I believe I found the issue. The message is now rolledback when I break the connection to the queue manager during the prepare of 2PC. Hopefully this might help somebody else.
In my question I mentioned putting a breakpoint in the AbstractPollingMessageListenerContainer.receiveAndExecute. In the spring version that I was using 5.2.20.RELEASE the code looked like this:
if (this.transactionManager != null) {
// Execute receive within transaction.
TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
boolean messageReceived;
try {
messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
}
catch (JMSException | RuntimeException | Error ex) {
rollbackOnException(this.transactionManager, status, ex);
throw ex;
}
this.transactionManager.commit(status);
return messageReceived;
}
Which appeared a bit odd since the transactionManager.commit was not surrounded by a try catch. So what was happening if the commit failed?
The try-catch was added in 5.3.16, see https://github.com/spring-projects/spring-framework/pull/1807
if (this.transactionManager != null) {
// Execute receive within transaction.
TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
boolean messageReceived;
try {
messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
}
catch (JMSException | RuntimeException | Error ex) {
rollbackOnException(this.transactionManager, status, ex);
throw ex;
}
try {
this.transactionManager.commit(status);
}
catch (TransactionException ex) {
// Propagate transaction system exceptions as infrastructure problems.
throw ex;
}
catch (RuntimeException ex) {
// Typically a late persistence exception from a listener-used resource
// -> handle it as listener exception, not as an infrastructure problem.
// E.g. a database locking failure should not lead to listener shutdown.
handleListenerException(ex);
}
return messageReceived;
}
Upvotes: 2