Reputation: 869
my setup (simplified for clarity) is following:
<int:inbound-channel-adapter channel="in" expression="0">
<int:poller cron="0 0 * * * *"/>
<int:header name="snapshot_date" expression="new java.util.Date()"/>
<int:header name="correlationId" expression="T(java.util.UUID).randomUUID()"/>
<!-- more here -->
</int:inbound-channel-adapter>
<int:recipient-list-router input-channel="in" apply-sequence="true">
<int:recipient channel="data.source.1"/>
<int:recipient channel="data.source.2"/>
<!-- more here -->
</int:recipient-list-router>
<int:chain input-channel="data.source.1" output-channel="save">
<int-jdbc:outbound-gateway data-source="db1" max-rows-per-poll="0">
<int-jdbc:query>
select * from large_dataset
</int-jdbc:query>
</int-jdbc:outbound-gateway>
<int:header-enricher>
<int:header name="source" value="data.source.1"/>
</int:header-enricher>
</int:chain>
<int:chain input-channel="data.source.2" output-channel="save">
<int-jdbc:outbound-gateway data-source="db1" max-rows-per-poll="0">
<int-jdbc:query>
select * from another_large_dataset
</int-jdbc:query>
</int-jdbc:outbound-gateway>
<int:header-enricher>
<int:header name="source" value="data.source.2"/>
</int:header-enricher>
</int:chain>
<int:chain input-channel="save" output-channel="process">
<int:splitter expression="T(com.google.common.collect.Lists).partition(payload, 1000)"/>
<int:transformer>
<int-groovy:script location="transform.groovy"/>
</int:transformer>
<int:service-activator expression="@db2.insertData(payload, headers)"/>
<int:aggregator/>
</int:chain>
<int:chain input-channel="process" output-channel="nullChannel">
<int:aggregator/>
<int:service-activator expression="@finalProcessing.doSomething()"/>
</int:chain>
let me explain the steps a little bit:
this configuration does the job so far, but is not scalable. main problem is that i have to load full dataset into memory first and pass it along the pipeline, which might cause memory issues.
my question is - what is the simplest way to have resultset extracted from db1, pushed through the pipeline and inserted into db2 in small batches?
Upvotes: 1
Views: 722
Reputation: 121212
First of all since version 4.0.4 Spring Integration's <splitter>
supports Iterator
as payload
to avoid memory overhead.
We have a test-case for the JDBC which shows that behaviour. But as you see it is based on the Spring Integration Java DSL and Java 8 Lamdas. (Yes, it can be done even for older Java versions without Lamdas). Even if this case is appropriate for you, your <aggregator>
should not be in-memory, because it collects all messages to the MessageStore
.
That's first case.
Another option is based on the paging
algorithm, when your SELECT
accepts a pair of WHERE
params in the your DB dialect. For Oracle it can be like: Paging with Oracle.
Where the pageNumber
is some message header
- :headers[pageNumber]
After that you do some trick with <recipient-list-router>
to send a SELECT
result to the save
channel and to some other channel wich increments pageNumber
header value and sends a message to the data.source.1
channel and so on. When the pageNumber
becomes out of data scope, the <int-jdbc:outbound-gateway>
stops produces results.
Something like that.
I don't say that it so easy, but it should be a start point for you, at least.
Upvotes: 1