Reputation: 1149
How is it possible to define a transaction for a complete flow in spring integration (Java DSL)?
With Spring integration we can define an example flow with:
@Bean
public IntegrationFlow myMessageFromMessageAmqpInboundFlow() {
return IntegrationFlows.from(myInboundChannel)
.transform(aMessageTransformer)
.transform(anotherMessageTransformer)
.channel(anOutputChannel)
.get();
}
I need a transaction which overspans the complete flow. Currently, when I access a database with 'aMessageTransformer', the transaction will be closed after this message transformer has been processed. But I need a transaction which is still uncommitted when processing 'anotherMessageTransformer'?
I expected that I just have to add a '@Transactional' (or @Transactional(propagation = Propagation.REQUIRED, readOnly = true))
@Bean
@Transactional
public IntegrationFlow myMessageFromMessageAmqpInboundFlow() {
return IntegrationFlows.from(myInboundChannel)
.transform(aMessageTransformer)
.transform(anotherMessageTransformer)
.channel(anOutputChannel)
.get();
}
but this leads to a 'no session exception' in 'anotherMessageTransformer'
Upvotes: 3
Views: 3763
Reputation: 121262
You need to follow this documentation and therefore add this to your flow:
.transform(aMessageTransformer, e -> e.transactional(true))
where that .transactional()
is about:
/**
* Specify a {@link TransactionInterceptor} {@link Advice} with default
* {@code PlatformTransactionManager} and {@link DefaultTransactionAttribute} for the
* {@link MessageHandler}.
* @param handleMessageAdvice the flag to indicate the target {@link Advice} type:
* {@code false} - regular {@link TransactionInterceptor}; {@code true} -
* {@link org.springframework.integration.transaction.TransactionHandleMessageAdvice}
* extension.
* @return the spec.
*/
public S transactional(boolean handleMessageAdvice) {
The TransactionHandleMessageAdvice
means:
* When this {@link Advice} is used from the {@code request-handler-advice-chain}, it is applied
* to the {@link MessageHandler#handleMessage}
* (not to the
* {@link org.springframework.integration.handler.AbstractReplyProducingMessageHandler.RequestHandler#handleRequestMessage}),
* therefore the entire downstream process is wrapped to the transaction.
Upvotes: 2