Reputation: 969
This appears, to me, to be a simple problem that is probably replicated all over the place. A very basic application of the MessageHandlerChain, probably using nothing more than out of the box functionality.
Conceptually, what I need is this:
(1) Polled JDBC reader (sets parameters for integration pass)
|
V
(2) JDBC Reader (uses input from (1) to fetch data to feed through channel
|
V
(3) JDBC writer (writes data fetched by (2) to target)
|
V
(4) JDBC writer (writes additional data from the original parameters fetched in (1))
What I think I need is
Flow:
From: JdbcPollingChannelAdapter (setup adapter)
Handler: messageHandlerChain
Handlers (
JdbcPollingChannelAdapter (inbound adapter)
JdbcOutboundGateway (outbound adapter)
JdbcOutboundGateway (cleanup gateway)
)
The JdbcPollingChannelAdapter does not implement the MessageHandler API, so I am at a loss how to read the actual data based on the setup step.
Since the JdbcOutboundGateway does not implement the MessageProducer API, I am at a bit of a loss as to what I need to use for the outbound adapter.
Are there OOB classes I should be using? Or do I need to somehow wrap the two adapters in BridgeHandlers to make this work?
Thanks in advance
EDIT (2) Additional configuration problem
The setup adapter is pulling a single row back with two timestamp columns. They are being processed correctly by the "enrich headers" piece.
However, when the inbound adapter is executing, the framework is passing in java.lang.Object as parameters. Not String, not Timestamp, but an actual java.lang.Object as in new Object ()
.
It is passing the correct number of objects, but the content and datatypes are lost. Am I correct that the ExpressionEvaluatingSqlParameterSourceFactory needs to be configured?
Message:
GenericMessage [payload=[{startTime=2020-11-18 18:01:34.90944, endTime=2020-11-18 18:01:34.90944}], headers={startTime=2020-11-18 18:01:34.90944, id=835edf42-6f69-226a-18f4-ade030c16618, timestamp=1605897225384}]
SQL in the JdbcOutboundGateway:
Select t.*, w.operation as "ops" from ADDRESS t
Inner join TT_ADDRESS w
on (t.ADDRESSID = w.ADDRESSID)
And (w.LASTUPDATESTAMP >= :payload.from[0].get("startTime") and w.LASTUPDATESTAMP <= :payload.from[0].get("endTime") )
Edit: added solution java DSL configuration
private JdbcPollingChannelAdapter setupAdapter; // select only
private JdbcOutboundGateway inboundAdapter; // select only
private JdbcOutboundGateway insertUpdateAdapter; // update only
private JdbcOutboundGateway deleteAdapter; // update only
private JdbcMessageHandler cleanupAdapter; // update only
setFlow(IntegrationFlows
.from(setupAdapter, c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
.enrichHeaders(h -> h.headerExpression("ALC_startTime", "payload.from[0].get(\"ALC_startTime\")")
.headerExpression("ALC_endTime", "payload.from[0].get(\"ALC_endTime\")"))
.handle(inboundAdapter)
.enrichHeaders(h -> h.headerExpression("ALC_operation", "payload.from[0].get(\"ALC_operation\")"))
.handle(insertUpdateAdapter)
.handle(deleteAdapter)
.handle(cleanupAdapter)
.get());
flowContext.registration(flow).id(this.getId().toString()).register();
Upvotes: 0
Views: 359
Reputation: 121177
If you would like to carry the original arguments down to the last gateway in your flow, you need to store those arguments in the headers since after every step the payload of reply message is going to be different and you won't have original setup data over there any more. That's first.
Second: if you deal with IntegrationFlow
and Java DSL, you don't need to worry about messageHandlerChain
since conceptually the IntegrationFlow
is a chain by itself but much more advance.
I'm not sure why you need to use a JdbcPollingChannelAdapter
to request data on demand according incoming message from the source in the beginning of your flow.
You definitely still need to use a JdbcOutboundGateway
for just SELECT
mode. The updateQuery
is optional, so that gateway is just going to perform SELECT
and return a data for you in a payload of the reply message.
If you two next steps are just "write" and you don't care about the result, you probably can just take a look into a PublishSubscribeChannel
and two JdbcMessageHandler
as subscribers to it. Without a provided Executor
for the PublishSubscribeChannel
they are going to be executed one-by-one.
Upvotes: 1