Constantine
Constantine

Reputation: 381

Spring Integration transaction for subflow of router

I have a channel named "creationChannel" which is backed up with MongoMessageStore like this:

  @Bean
  ChannelMessageStore messageStore() {
    return new MongoDbChannelMessageStore(mongoDatabaseFactory);
  }

  @Bean
  PollableChannel creationChannel(ChannelMessageStore messageStore) {
    return MessageChannels.queue("creationChannel", messageStore, "create").get();
  }

And I want to use it in my flow here, but I want to be sure, that message from there will be read-only if "createOrderHandler" worked fine (the same applies to "updateOrderHandler", but with the different channel).

...some code here...
        .<HeadersElement, OperationType>route(
            payload -> route(payload),
            spec -> spec
                .transactional(transactionHandleMessageAdvice)
                .subFlowMapping(
                    OperationType.New,
                    sf -> sf
                        .channel("creationChannel")
                        .log(Level.DEBUG, "Creation of a new order", Message::getPayload)
                        .transform(Mapper::mapCreate)
                        .handle(createOrderHandler,
                            handlerSpec -> handlerSpec.advice(retryOperationsInterceptor))
                )
                .subFlowMapping(
                    OperationType.Update,
                    sf -> sf
                        .channel("updateChannel")
                        .log(Level.DEBUG, "Update for existing order", Message::getPayload)
                        .transform(Mapper::mapUpdate)
                        .handle(updateOrderHandler,
                            handlerSpec -> handlerSpec.advice(retryOperationsInterceptor))
                )
        )
...some code here...

I tried to configure "transactionHandleMessageAdvice" like this:

  @Bean
  TransactionHandleMessageAdvice transactionHandleMessageAdvice(MongoTransactionManager transactionManager) {
    return new TransactionHandleMessageAdvice(transactionManager, new Properties());
  }

But messages are still being deleted from the database after the handler fails with an exception.

Maybe I should configure Poller for subflows and configure it with MongoTransactionManager somehow?

Upvotes: 0

Views: 272

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121272

Maybe I should configure Poller for subflows and configure it with MongoTransactionManager somehow?

That's correct assumption. As fast as you have a thread shifting in the flow (like yours PollableChannel creationChannel), the current transaction is committed at the moment the message is placed into the store. Nothing more happens in the current thread and, therefore, current transaction which you have started with that .transactional(transactionHandleMessageAdvice).

To make reading transactional, you indeed have to configure a Poller on the .transform(Mapper::mapCreate) endpoint. So, every poll from that queue channel is going to be transactional until you shift to different thread again.

There is just no way (and must not be) to have the whole async flow transactional since transactions are tied to the ThreadLocal and at the moment when call stack comes back to the transaction initiator, it is committed or rolled back. With an async logic we just intend to "send-and-forget" from the producer side and let consumer to deal with data whenever it is ready. That is not what transactions have been designed for.

Upvotes: 1

Related Questions