ABX
ABX

Reputation: 1149

How to handle transactions for Spring integration flows (Java DSL)

How is it possible to define a transaction for a complete flow in spring integration (Java DSL)?

With Spring integration we can define an example flow with:

@Bean
public IntegrationFlow myMessageFromMessageAmqpInboundFlow() {
    return IntegrationFlows.from(myInboundChannel)
            .transform(aMessageTransformer)
            .transform(anotherMessageTransformer)
            .channel(anOutputChannel)
            .get();
}

I need a transaction which overspans the complete flow. Currently, when I access a database with 'aMessageTransformer', the transaction will be closed after this message transformer has been processed. But I need a transaction which is still uncommitted when processing 'anotherMessageTransformer'?

I expected that I just have to add a '@Transactional' (or @Transactional(propagation = Propagation.REQUIRED, readOnly = true))

@Bean
@Transactional
public IntegrationFlow myMessageFromMessageAmqpInboundFlow() {
    return IntegrationFlows.from(myInboundChannel)
            .transform(aMessageTransformer)
            .transform(anotherMessageTransformer)
            .channel(anOutputChannel)
            .get();
}

but this leads to a 'no session exception' in 'anotherMessageTransformer'

Upvotes: 3

Views: 3763

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121262

You need to follow this documentation and therefore add this to your flow:

.transform(aMessageTransformer, e -> e.transactional(true))

where that .transactional() is about:

/**
 * Specify a {@link TransactionInterceptor} {@link Advice} with default
 * {@code PlatformTransactionManager} and {@link DefaultTransactionAttribute} for the
 * {@link MessageHandler}.
 * @param handleMessageAdvice the flag to indicate the target {@link Advice} type:
 * {@code false} - regular {@link TransactionInterceptor}; {@code true} -
 * {@link org.springframework.integration.transaction.TransactionHandleMessageAdvice}
 * extension.
 * @return the spec.
 */
public S transactional(boolean handleMessageAdvice) {

The TransactionHandleMessageAdvice means:

* When this {@link Advice} is used from the {@code request-handler-advice-chain}, it is applied
 * to the {@link MessageHandler#handleMessage}
 * (not to the
 * {@link org.springframework.integration.handler.AbstractReplyProducingMessageHandler.RequestHandler#handleRequestMessage}),
 * therefore the entire downstream process is wrapped to the transaction.

Upvotes: 2

Related Questions