pojo-guy
pojo-guy

Reputation: 969

Spring integration: How to implement a JDBCOutboundGateway in the middle of a MessageChain?

This appears, to me, to be a simple problem that is probably replicated all over the place. A very basic application of the MessageHandlerChain, probably using nothing more than out of the box functionality.

Conceptually, what I need is this:

(1) Polled JDBC reader (sets parameters for integration pass)
    |
    V
(2) JDBC Reader (uses input from (1) to fetch data to feed through channel
    |
    V
(3) JDBC writer (writes data fetched by (2) to target)
    |
    V
(4) JDBC writer (writes additional data from the original parameters fetched in (1))

What I think I need is

Flow:
From: JdbcPollingChannelAdapter (setup adapter)
Handler: messageHandlerChain
    Handlers (
       JdbcPollingChannelAdapter (inbound adapter)
       JdbcOutboundGateway (outbound adapter)
       JdbcOutboundGateway (cleanup gateway)
    )

The JdbcPollingChannelAdapter does not implement the MessageHandler API, so I am at a loss how to read the actual data based on the setup step.

Since the JdbcOutboundGateway does not implement the MessageProducer API, I am at a bit of a loss as to what I need to use for the outbound adapter.

Are there OOB classes I should be using? Or do I need to somehow wrap the two adapters in BridgeHandlers to make this work?

Thanks in advance


EDIT (2) Additional configuration problem

The setup adapter is pulling a single row back with two timestamp columns. They are being processed correctly by the "enrich headers" piece.

However, when the inbound adapter is executing, the framework is passing in java.lang.Object as parameters. Not String, not Timestamp, but an actual java.lang.Object as in new Object ().

It is passing the correct number of objects, but the content and datatypes are lost. Am I correct that the ExpressionEvaluatingSqlParameterSourceFactory needs to be configured?

Message:

GenericMessage [payload=[{startTime=2020-11-18 18:01:34.90944, endTime=2020-11-18 18:01:34.90944}], headers={startTime=2020-11-18 18:01:34.90944, id=835edf42-6f69-226a-18f4-ade030c16618, timestamp=1605897225384}]

SQL in the JdbcOutboundGateway:

Select t.*, w.operation as "ops" from ADDRESS t
Inner join TT_ADDRESS w 
  on (t.ADDRESSID = w.ADDRESSID)
  And (w.LASTUPDATESTAMP >= :payload.from[0].get("startTime") and w.LASTUPDATESTAMP <= :payload.from[0].get("endTime") )

Edit: added solution java DSL configuration

private JdbcPollingChannelAdapter setupAdapter; // select only
private JdbcOutboundGateway inboundAdapter; // select only
private JdbcOutboundGateway insertUpdateAdapter; // update only
private JdbcOutboundGateway deleteAdapter; // update only
private JdbcMessageHandler cleanupAdapter; // update only

        setFlow(IntegrationFlows
            .from(setupAdapter, c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
            .enrichHeaders(h -> h.headerExpression("ALC_startTime", "payload.from[0].get(\"ALC_startTime\")")
                    .headerExpression("ALC_endTime", "payload.from[0].get(\"ALC_endTime\")"))
            .handle(inboundAdapter)
            .enrichHeaders(h -> h.headerExpression("ALC_operation", "payload.from[0].get(\"ALC_operation\")"))
            .handle(insertUpdateAdapter)
            .handle(deleteAdapter)
            .handle(cleanupAdapter)
            .get());

flowContext.registration(flow).id(this.getId().toString()).register();

Upvotes: 0

Views: 359

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121177

If you would like to carry the original arguments down to the last gateway in your flow, you need to store those arguments in the headers since after every step the payload of reply message is going to be different and you won't have original setup data over there any more. That's first.

Second: if you deal with IntegrationFlow and Java DSL, you don't need to worry about messageHandlerChain since conceptually the IntegrationFlow is a chain by itself but much more advance.

I'm not sure why you need to use a JdbcPollingChannelAdapter to request data on demand according incoming message from the source in the beginning of your flow.

You definitely still need to use a JdbcOutboundGateway for just SELECT mode. The updateQuery is optional, so that gateway is just going to perform SELECT and return a data for you in a payload of the reply message.

If you two next steps are just "write" and you don't care about the result, you probably can just take a look into a PublishSubscribeChannel and two JdbcMessageHandler as subscribers to it. Without a provided Executor for the PublishSubscribeChannel they are going to be executed one-by-one.

Upvotes: 1

Related Questions