ilj
ilj

Reputation: 869

Batch processing in jdbc gateway

my setup (simplified for clarity) is following:

<int:inbound-channel-adapter channel="in" expression="0">
    <int:poller cron="0 0 * * * *"/>
    <int:header name="snapshot_date" expression="new java.util.Date()"/>
    <int:header name="correlationId" expression="T(java.util.UUID).randomUUID()"/>
    <!-- more here -->
</int:inbound-channel-adapter>

<int:recipient-list-router input-channel="in" apply-sequence="true">
    <int:recipient channel="data.source.1"/>
    <int:recipient channel="data.source.2"/>
    <!-- more here -->
</int:recipient-list-router>

<int:chain input-channel="data.source.1" output-channel="save">
    <int-jdbc:outbound-gateway data-source="db1" max-rows-per-poll="0">
        <int-jdbc:query>
            select * from large_dataset
        </int-jdbc:query>
    </int-jdbc:outbound-gateway>
    <int:header-enricher>
        <int:header name="source" value="data.source.1"/>
    </int:header-enricher>
</int:chain>

<int:chain input-channel="data.source.2" output-channel="save">
    <int-jdbc:outbound-gateway data-source="db1" max-rows-per-poll="0">
        <int-jdbc:query>
            select * from another_large_dataset
        </int-jdbc:query>
    </int-jdbc:outbound-gateway>
    <int:header-enricher>
        <int:header name="source" value="data.source.2"/>
    </int:header-enricher>
</int:chain>

<int:chain input-channel="save" output-channel="process">
    <int:splitter expression="T(com.google.common.collect.Lists).partition(payload, 1000)"/>
    <int:transformer>
        <int-groovy:script location="transform.groovy"/>
    </int:transformer>
    <int:service-activator expression="@db2.insertData(payload, headers)"/>
    <int:aggregator/>
</int:chain>

<int:chain input-channel="process" output-channel="nullChannel">
    <int:aggregator/>
    <int:service-activator expression="@finalProcessing.doSomething()"/>
</int:chain>

let me explain the steps a little bit:

  1. poller is triggered by cron. message is enriched with some information about this run.
  2. message is sent to multiple data-source chains.
  3. each chain extracts data from large dataset (100+k rows). resultset message is marked with source header.
  4. resultset is split into smaller chunks, transformed and inserted into db2.
  5. after all data sources have been polled, some complex processing is initiated, using the information about the run.

this configuration does the job so far, but is not scalable. main problem is that i have to load full dataset into memory first and pass it along the pipeline, which might cause memory issues.

my question is - what is the simplest way to have resultset extracted from db1, pushed through the pipeline and inserted into db2 in small batches?

Upvotes: 1

Views: 722

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121212

First of all since version 4.0.4 Spring Integration's <splitter> supports Iterator as payload to avoid memory overhead.

We have a test-case for the JDBC which shows that behaviour. But as you see it is based on the Spring Integration Java DSL and Java 8 Lamdas. (Yes, it can be done even for older Java versions without Lamdas). Even if this case is appropriate for you, your <aggregator> should not be in-memory, because it collects all messages to the MessageStore.

That's first case.

Another option is based on the paging algorithm, when your SELECT accepts a pair of WHERE params in the your DB dialect. For Oracle it can be like: Paging with Oracle. Where the pageNumber is some message header - :headers[pageNumber]

After that you do some trick with <recipient-list-router> to send a SELECT result to the save channel and to some other channel wich increments pageNumber header value and sends a message to the data.source.1 channel and so on. When the pageNumber becomes out of data scope, the <int-jdbc:outbound-gateway> stops produces results.

Something like that.

I don't say that it so easy, but it should be a start point for you, at least.

Upvotes: 1

Related Questions