Reputation: 5487
I'm new to Spring Integration and I'm experimenting with the various components in a small project.
In the task at hand, I need to process a text file and store its contents to database. The file holds lines that can be grouped together, so it will be natural to divide each file into several independent messages.
This is the whole process (please see the config at the end):
transformers.outcomeTransf
splitters.outcomeSplit
transformers.SingleoutcomeToMap
stored-proc-outbound-channel-adapter
The database holds just two tables:
I'm missing the component for step 2. As I understand it, a channel outbound adapter "swallows" the message it handles, so that no other endpoint can receive it.
I thought about a publish-subscribe channel (without a TaskExecutor
) after step one, with a jdbc outbound adapter as first subscriber and the splitter from stem 3 as the second one: each subscribed handler should then receive a copy of the message but it's not clear to me if any processing in the splitter would wait the outbound adapter had finished.
Is this the right approach to the task? What if the transformer at step 4 is called asynchronously - each splitted message is self contained and that would call for concurrency.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/jdbc
http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- input-files-from-folder -->
<int-file:inbound-channel-adapter id="outcomeIn"
directory="file:/in-outcome">
<int:poller id="poller" fixed-delay="2500" />
</int-file:inbound-channel-adapter>
<int:transformer input-channel="outcomeIn" output-channel="outcomesChannel" method="transform">
<beans:bean class="transformers.outcomeTransf" />
</int:transformer>
<!-- save source to db! -->
<int:splitter input-channel="outcomesChannel" output-channel="singleoutcomeChannel" method="splitMessage">
<beans:bean class="splitters.outcomeSplit" />
</int:splitter>
<int:transformer input-channel="singleoutcomeChannel" output-channel="jdbcChannel" method="transform">
<beans:bean class="transformers.SingleoutcomeToMap" />
</int:transformer>
<int-jdbc:stored-proc-outbound-channel-adapter
data-source="dataSource" channel="jdbcChannel" stored-procedure-name="insert_outcome"
ignore-column-meta-data="true">
<int-jdbc:sql-parameter-definitions ... />
<int-jdbc:parameter ... />
</int-jdbc:stored-proc-outbound-channel-adapter>
<bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close" >
<property name="driverClassName" value="org.postgresql.Driver"/>
<property ... />
</bean>
</beans>
Upvotes: 1
Views: 328
Reputation: 121237
You think the right way. When you have a PublishSubscribeChannel
without an Executor
, each next subscriber is going to wait when the previous finishes its work. Therefore your spllitter
is not going to be called until everything is done on DB. More over by default, when first subscriber fails to handle a message (not DB connection ?), all others won't be called.
Another way to achieve similar behavior can be configured with the <request-handler-advice-chain>
and ExpressionEvaluatingRequestHandlerAdvice
: https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/messaging-endpoints-chapter.html#expression-advice
All the splitter downstream flow concurrency and multi-threading is already not related to the DB logic. A parallelism isn't going to happen until DB performs its request properly.
Upvotes: 1