Reputation: 320
These are all the requirements that need to be fulfilled
The chain in question is the following
<int:poller id="myInputChannelPoller">
<int:transactional transaction-manager="myTransactionManager">
</int:poller>
<int:chain id="myChain" input-channel="myInputChannel">
<int:poller ref="myInputChannelPoller" />
<int:service-activtor id="someMessageManipulation" />
<int:service-activtor id="aDatabaseOperation" />
<int:service-activtor id="sendJmsMessage" />
<int:service-activtor id="anotherMessageManipulation" />
<int:service-activtor id="anotherDatabaseOperation" />
</int:chain>
To wrap the chain in a transaction, I added the <int:transactional/>
tag to the poller.
Having all the JMS transaction committed before the database transaction, I made use of the ChainedTransactionManager
@Bean
public PlatformTransactionManager myTransactionManager(JpaTransactionManager jpaTransactionManager, JmsTransactionmanager jmsTransactionManager) {
// jpa before jms, since the commits will be done in reverse order
return new ChainedTransactionManager(jpaTransactionManager, jmsTransactionmanager)
}
This also ensures no database commits, when JMS commits fail.
The only requirement I'm not sure how to satisfy properly is the after database commit action. The first idea that came to mind was the transaction-synchronization-factory
mechanism as in
<int:transaction-synchronization-factory id="mySyncFactory">
<int:after-commit channel="myOutputChannel" />
</int:transaction-synchronization-factory>
and link it to the transaction of the existing poller
<int:poller id="myInputChannelPoller">
<int:transactional transaction-manager="myTransactionManager" synchronization-factory="mySyncFactory">
</int:poller>
But this approach seems to send the message that was initially received from myInputChannel, not considering the message manipulations.
Another idea would be to add another service-activator
MyJpaTransactionManagerInjector
to the end of the chain, which adds the message and the output queue as payload to a subclass of JpaTransactionManager MyJpaTransactionManager
.
Receive message and target output channel as input. Create a ForwardMessage
and inject it to MyJpaTransactionManager
.
public class MyJpaTransactionManagerInjector {
@Inject
private MyJpaTransactionManager myJpaTransactionManager;
@ServiceActivator
public void injectMessage(Message<?> message, @Header(value = "outputChannel") MessageChannel outputChannel) {
myJpaTransactionManager.inject(new ForwardMessage(message, outputChannel));
}
}
Register ForwardMessage
before executing the commit.
public class MyJpaTransactionManager extends JpaTransactionManager {
private final ThreadLocal<List<TransactionSynchronizationAdapter>> synchronizations =
new NamedThreadLocal<>("Transactional resources");
public void inject(TransactionSynchronizationAdapter synchronization) {
if (synchronizations.get() == null) {
synchronizations.set(new ArrayList<>());
}
synchronizations.get().add(synchronization);
}
@Override
protected void prepareForCommit(DefaultTransactionStatus status) {
super.prepareForCommit(status);
if (synchronizations.get() != null) {
synchronizations.get().forEach(TransactionSynchronizationManager::registerSynchronization);
}
synchronizations.remove();
}
}
Forward message on afterCommit()
to target channel
public class ForwardMessage extends TransactionSynchronizationAdapter {
private Message<?> message;
private MessageChannel outputChannel;
public ForwardMessage(Message<?> message, MessageChannel outputChannel) {
this.message = message;
this.outputChannel = outputChannel;
}
@Override
public void afterCommit() {
outputChannel.send(message);
}
}
Whenever possible, I would avoid subclassing a TransactionManager
since it's easy to break something that worked perfectly well. Is there a solution that should be preferred to this one?
Upvotes: 3
Views: 1461
Reputation: 121177
I would still stick with the <int:transaction-synchronization-factory>
.
The PollingConsumer
registers this:
TransactionSynchronizationManager.bindResource(resource,
integrationSynchronization.getResourceHolder());
into the transaction.
Where resource
is a MessageChannel
for the consumer. In your case it is myInputChannel
bean.
You can use something like:
IntegrationResourceHolder integrationResourceHolder = (IntegrationResourceHolder) TransactionSynchronizationManager.getResource(myInputChannel);
And call its addAttribute()
to store the message after processing:
integrationResourceHolder.addAttribute("resultMessage", resultMessage);
Afterward you still can use that <int:after-commit channel="myOutputChannel" />
and get access to that attribute via appropriate SpEL variable:
<int:after-commit channel="myOutputChannel" expression="#resultMessage"/>
Upvotes: 1