watery
watery

Reputation: 5487

Wait for jdbc outbound channel adapter to complete before further processing

I'm new to Spring Integration and I'm experimenting with the various components in a small project.

In the task at hand, I need to process a text file and store its contents to database. The file holds lines that can be grouped together, so it will be natural to divide each file into several independent messages.

This is the whole process (please see the config at the end):

  1. do an initial file analysis;
    • done by transformers.outcomeTransf
  2. store some data to database (i.e. file name, file date, etc.);
    • ?
  3. split the file contents into several distinct messages;
    • done by splitters.outcomeSplit
  4. further analyze each message;
    • done by transformers.SingleoutcomeToMap
  5. store single message data to database referencing data stored at step 1.
    • done by stored-proc-outbound-channel-adapter

The database holds just two tables:

I'm missing the component for step 2. As I understand it, a channel outbound adapter "swallows" the message it handles, so that no other endpoint can receive it.

I thought about a publish-subscribe channel (without a TaskExecutor) after step one, with a jdbc outbound adapter as first subscriber and the splitter from stem 3 as the second one: each subscribed handler should then receive a copy of the message but it's not clear to me if any processing in the splitter would wait the outbound adapter had finished.

Is this the right approach to the task? What if the transformer at step 4 is called asynchronously - each splitted message is self contained and that would call for concurrency.


Spring configuration:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-file="http://www.springframework.org/schema/integration/file"
    xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
    xmlns:beans="http://www.springframework.org/schema/beans"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file
            http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jdbc
            http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- input-files-from-folder -->
    <int-file:inbound-channel-adapter id="outcomeIn" 
                                  directory="file:/in-outcome">
        <int:poller id="poller" fixed-delay="2500" />
    </int-file:inbound-channel-adapter>

    <int:transformer input-channel="outcomeIn" output-channel="outcomesChannel" method="transform">
        <beans:bean class="transformers.outcomeTransf" />
    </int:transformer>

    <!--  save source to db! -->

    <int:splitter input-channel="outcomesChannel" output-channel="singleoutcomeChannel" method="splitMessage">
        <beans:bean class="splitters.outcomeSplit" />
    </int:splitter>

    <int:transformer input-channel="singleoutcomeChannel" output-channel="jdbcChannel" method="transform">
        <beans:bean class="transformers.SingleoutcomeToMap" />
    </int:transformer>

    <int-jdbc:stored-proc-outbound-channel-adapter   
        data-source="dataSource" channel="jdbcChannel" stored-procedure-name="insert_outcome" 
        ignore-column-meta-data="true">

        <int-jdbc:sql-parameter-definitions ... />

        <int-jdbc:parameter ... />

    </int-jdbc:stored-proc-outbound-channel-adapter>

    <bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close" >
        <property name="driverClassName" value="org.postgresql.Driver"/>
        <property ... />
    </bean>

</beans>

Upvotes: 1

Views: 328

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121237

You think the right way. When you have a PublishSubscribeChannel without an Executor, each next subscriber is going to wait when the previous finishes its work. Therefore your spllitter is not going to be called until everything is done on DB. More over by default, when first subscriber fails to handle a message (not DB connection ?), all others won't be called.

Another way to achieve similar behavior can be configured with the <request-handler-advice-chain> and ExpressionEvaluatingRequestHandlerAdvice: https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/messaging-endpoints-chapter.html#expression-advice

All the splitter downstream flow concurrency and multi-threading is already not related to the DB logic. A parallelism isn't going to happen until DB performs its request properly.

Upvotes: 1

Related Questions