Reputation: 11818
I have a persistent, transactional queue containing messages I need to send over an asynchronous protocol. Each message needs to be sent in its own transaction, but the number of messages in flight at a given time precludes using thread-per-message, while throughput requirements preclude persisting intermediate states.
Looking at the code for JmsTransactionManager
, I see it's using TransactionSynchronizationManager
, which stores transaction resources in a ThreadLocal
. So it seems that I'm going to need to implement a PlatformTransactionManager
to somehow shepherd multiple transactions within a single thread. This seems a bit extreme...
Is there some arrangement of Spring Integration units that would obviate this complexity? Should I rather be looking info JTA/XA?
Upvotes: 1
Views: 3614
Reputation: 1095
All of these queue-based channels in Spring Integration are storing messages in-memory only by default. When persistence is required, you can either provide a 'message-store
' attribute within the 'queue
' element to reference a persistent MessageStore implementation, or you can replace the local channel with one that is backed by a persistent broker, such as a JMS-backed channel or Channel Adapter. The latter option allows you to take advantage of any JMS provider's implementation for message persistence.
You can configure a Message Store for any QueueChannel
by adding the message-store attribute as shown below.
Spring Integration provides support for the Message Store pattern by a) defining a org.springframework.integration.store.MessageStore
strategy interface, b) providing several implementations of this interface, and c) exposing a message-store attribute on all components that have the capability to buffer messages so that you can inject any instance that implements the MessageStore interface.
My example uses the JDBC Message Store, but there are also several other options avaialble.
<bean id="myDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="${my.jdbc.driver}"/>
<property name="url" value="${my.jdbc.url}"/>
<property name="username" value="${my.jdbc.username}"/>
<property name="password" value="${my.jdbc.password}"/>
<property name="minEvictableIdleTimeMillis" value="300000"/>
<property name="timeBetweenEvictionRunsMillis" value="60000"/>
<property name="connectionProperties" value="SetBigStringTryClob=true;"/>
</bean>
<!-- The poller is needed by any of the QueueChannels -->
<integration:poller id="myPoller" fixed-rate="5000" default="true"/>
<!-- The MessageStore is needed to persist messages used by any of the QueueChannels -->
<int-jdbc:message-store id="myMessageStore" data-source="myDataSource" table-prefix="MY_INT_"/>
<!-- Main entry point into the process -->
<integration:gateway id="myGateway"
service-interface="com.mycompany.myproject.integration.gateways.myGateway"
default-request-channel="myGatewayChannel"
/>
<!-- Map the initial input channel to the first step, MyFirstService -->
<integration:channel id="myGatewayChannel">
<integration:queue capacity="1000"/>
<integration:queue message-store="myMessageStore" capacity="1000"/>
</integration:channel>
<!-- Step 1: My First Service -->
<integration:service-activator
id="myFirstServiceActivator"
input-channel="myGatewayChannel"
output-channel="myNextChannel"
ref="myFirstService"
method="process"/>
<!-- LONG running process. Setup asynchronous queue channel. -->
<integration:channel id="myNextChannel">
<integration:queue capacity="1000"/>
<integration:queue message-store="myMessageStore" capacity="1000"/>
</integration:channel>
Upvotes: 1