Reputation: 381
I have a channel named "creationChannel" which is backed up with MongoMessageStore like this:
@Bean
ChannelMessageStore messageStore() {
return new MongoDbChannelMessageStore(mongoDatabaseFactory);
}
@Bean
PollableChannel creationChannel(ChannelMessageStore messageStore) {
return MessageChannels.queue("creationChannel", messageStore, "create").get();
}
And I want to use it in my flow here, but I want to be sure, that message from there will be read-only if "createOrderHandler" worked fine (the same applies to "updateOrderHandler", but with the different channel).
...some code here...
.<HeadersElement, OperationType>route(
payload -> route(payload),
spec -> spec
.transactional(transactionHandleMessageAdvice)
.subFlowMapping(
OperationType.New,
sf -> sf
.channel("creationChannel")
.log(Level.DEBUG, "Creation of a new order", Message::getPayload)
.transform(Mapper::mapCreate)
.handle(createOrderHandler,
handlerSpec -> handlerSpec.advice(retryOperationsInterceptor))
)
.subFlowMapping(
OperationType.Update,
sf -> sf
.channel("updateChannel")
.log(Level.DEBUG, "Update for existing order", Message::getPayload)
.transform(Mapper::mapUpdate)
.handle(updateOrderHandler,
handlerSpec -> handlerSpec.advice(retryOperationsInterceptor))
)
)
...some code here...
I tried to configure "transactionHandleMessageAdvice" like this:
@Bean
TransactionHandleMessageAdvice transactionHandleMessageAdvice(MongoTransactionManager transactionManager) {
return new TransactionHandleMessageAdvice(transactionManager, new Properties());
}
But messages are still being deleted from the database after the handler fails with an exception.
Maybe I should configure Poller for subflows and configure it with MongoTransactionManager somehow?
Upvotes: 0
Views: 272
Reputation: 121272
Maybe I should configure Poller for subflows and configure it with MongoTransactionManager somehow?
That's correct assumption. As fast as you have a thread shifting in the flow (like yours PollableChannel creationChannel
), the current transaction is committed at the moment the message is placed into the store. Nothing more happens in the current thread and, therefore, current transaction which you have started with that .transactional(transactionHandleMessageAdvice)
.
To make reading transactional, you indeed have to configure a Poller
on the .transform(Mapper::mapCreate)
endpoint. So, every poll from that queue channel is going to be transactional until you shift to different thread again.
There is just no way (and must not be) to have the whole async flow transactional since transactions are tied to the ThreadLocal
and at the moment when call stack comes back to the transaction initiator, it is committed or rolled back. With an async logic we just intend to "send-and-forget" from the producer side and let consumer to deal with data whenever it is ready. That is not what transactions have been designed for.
Upvotes: 1