wegenmic
wegenmic

Reputation: 320

Transactional commits in correct order and after commit hook with Spring Integration

These are all the requirements that need to be fulfilled

The chain in question is the following

<int:poller id="myInputChannelPoller">
  <int:transactional transaction-manager="myTransactionManager">
</int:poller>

<int:chain id="myChain" input-channel="myInputChannel">
  <int:poller ref="myInputChannelPoller" />
  <int:service-activtor id="someMessageManipulation" />
  <int:service-activtor id="aDatabaseOperation" />
  <int:service-activtor id="sendJmsMessage" />
  <int:service-activtor id="anotherMessageManipulation" />
  <int:service-activtor id="anotherDatabaseOperation" />
</int:chain>

To wrap the chain in a transaction, I added the <int:transactional/> tag to the poller.

Having all the JMS transaction committed before the database transaction, I made use of the ChainedTransactionManager

@Bean
public PlatformTransactionManager myTransactionManager(JpaTransactionManager jpaTransactionManager, JmsTransactionmanager jmsTransactionManager) {
        // jpa before jms, since the commits will be done in reverse order
        return new ChainedTransactionManager(jpaTransactionManager, jmsTransactionmanager)
}

This also ensures no database commits, when JMS commits fail.

The only requirement I'm not sure how to satisfy properly is the after database commit action. The first idea that came to mind was the transaction-synchronization-factory mechanism as in

<int:transaction-synchronization-factory id="mySyncFactory">
  <int:after-commit channel="myOutputChannel" />
</int:transaction-synchronization-factory>

and link it to the transaction of the existing poller

<int:poller id="myInputChannelPoller">
  <int:transactional transaction-manager="myTransactionManager" synchronization-factory="mySyncFactory">
</int:poller>

But this approach seems to send the message that was initially received from myInputChannel, not considering the message manipulations.

Another idea would be to add another service-activator MyJpaTransactionManagerInjector to the end of the chain, which adds the message and the output queue as payload to a subclass of JpaTransactionManager MyJpaTransactionManager.

Receive message and target output channel as input. Create a ForwardMessage and inject it to MyJpaTransactionManager.

public class MyJpaTransactionManagerInjector {

    @Inject
    private MyJpaTransactionManager myJpaTransactionManager;

    @ServiceActivator
    public void injectMessage(Message<?> message, @Header(value = "outputChannel") MessageChannel outputChannel) {
        myJpaTransactionManager.inject(new ForwardMessage(message, outputChannel));
    }
}

Register ForwardMessage before executing the commit.

public class MyJpaTransactionManager extends JpaTransactionManager {

    private final ThreadLocal<List<TransactionSynchronizationAdapter>> synchronizations =
            new NamedThreadLocal<>("Transactional resources");

    public void inject(TransactionSynchronizationAdapter synchronization) {
        if (synchronizations.get() == null) {
            synchronizations.set(new ArrayList<>());
        }
        synchronizations.get().add(synchronization);
    }

    @Override
    protected void prepareForCommit(DefaultTransactionStatus status) {            
        super.prepareForCommit(status);
        if (synchronizations.get() != null) {
            synchronizations.get().forEach(TransactionSynchronizationManager::registerSynchronization);
        }
        synchronizations.remove();
    }
}

Forward message on afterCommit() to target channel

public class ForwardMessage extends TransactionSynchronizationAdapter {

    private Message<?> message;
    private MessageChannel outputChannel;

    public ForwardMessage(Message<?> message, MessageChannel outputChannel) {
        this.message = message;
        this.outputChannel = outputChannel;
    }

    @Override
    public void afterCommit() {
        outputChannel.send(message);
    }
}

Whenever possible, I would avoid subclassing a TransactionManager since it's easy to break something that worked perfectly well. Is there a solution that should be preferred to this one?

Upvotes: 3

Views: 1461

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121177

I would still stick with the <int:transaction-synchronization-factory>.

The PollingConsumer registers this:

TransactionSynchronizationManager.bindResource(resource,
                                integrationSynchronization.getResourceHolder());

into the transaction.

Where resource is a MessageChannel for the consumer. In your case it is myInputChannel bean.

You can use something like:

IntegrationResourceHolder integrationResourceHolder = (IntegrationResourceHolder) TransactionSynchronizationManager.getResource(myInputChannel);

And call its addAttribute() to store the message after processing:

integrationResourceHolder.addAttribute("resultMessage", resultMessage);

Afterward you still can use that <int:after-commit channel="myOutputChannel" /> and get access to that attribute via appropriate SpEL variable:

<int:after-commit channel="myOutputChannel"  expression="#resultMessage"/>

Upvotes: 1

Related Questions