Going Bananas
Going Bananas

Reputation: 2537

JMS Outbound Gateway request destination - post process on success

I am using a JMS Outbound Gateway to send messages to a request queue and receive messages from a separate response queue. I would like to add functionality so that a call is made to a specific bean's method once a message has been successfully sent to the request queue.

I am using spring-integration 4.0.4 APIs and spring-integration-java-dsl 1.0.0 APIs for this and, I have so far been able to achieve the above functionality as follows:

@Configuration
@EnableIntegration
public class IntegrationConfig {

    ...

    @Bean
    public IntegrationFlow requestFlow() {

        return IntegrationFlows
            .from("request.ch")
            .routeToRecipients(r ->
                r.ignoreSendFailures(false)
                 .recipient("request.ch.1", "true")
                 .recipient("request.ch.2", "true"))
            .get();
    }

    @Bean
    public IntegrationFlow sendReceiveFlow() {

        return IntegrationFlows
            .from("request.ch.1")
            .handle(Jms.outboundGateway(cachingConnectionFactory)
                    .receiveTimeout(45000)
                    .requestDestination("REQUEST_QUEUE")
                    .replyDestination("RESPONSE_QUEUE")
                    .correlationKey("JMSCorrelationID"), e -> e.requiresReply(true))
                    .channel("response.ch").get();
    }

    @Bean
    public IntegrationFlow postSendFlow() {

        return IntegrationFlows
            .from("request.ch.2")
            .handle("requestSentService", "fireRequestSuccessfullySentEvent")
            .get();
    }

    ...
}

Now, although the above configuration works, I have noticed that the only apparent reason request.ch.1 is called before request.ch.2 seems to be because of the channel names' alphabetical order and not because of the order in which they where added to the RecipientListRouter itself. Is this correct? Or am I missing something here?

* EDIT below shows solution using Aggregator between JMS Outbound/Inbound Adapters approach (without Messaging Gateway) *

Integration Config:

@Configuration
@EnableIntegration
public class IntegrationConfig { 

    ...

    @Bean
    public IntegrationFlow reqFlow() {

        return IntegrationFlows
            .from("request.ch")
            .enrichHeaders(e -> e.headerChannelsToString())
            .enrichHeaders(e -> e.headerExpression(IntegrationMessageHeaderAccessor.CORRELATION_ID, "headers['" + MessageHeaders.REPLY_CHANNEL + "']"))             
            .routeToRecipients(r -> {
                r.ignoreSendFailures(false);
                r.recipient("jms.req.ch", "true");
                r.recipient("jms.agg.ch", "true");
            })
            .get();
    }

    @Bean
    public IntegrationFlow jmsReqFlow() {

        return IntegrationFlows
            .from("jms.req.ch")
            .handle(Jms.outboundAdapter(cachingConnectionFactory)
                    .destination("TEST_REQUEST_CH")).get();
    }

    @Bean
    public IntegrationFlow jmsPostReqFlow() {

        return IntegrationFlows
            .from("jms.req.ch")
            .handle("postSendService", "postSendProcess")
            .get();
    }

    @Bean
    public IntegrationFlow jmsResFlow() {

        return IntegrationFlows
            .from(Jms.inboundAdapter(cachingConnectionFactory).destination(
                    "TEST_RESPONSE_CH"),
                    c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(10)))
            .channel("jms.agg.ch").get();
    }

    @Bean
    public IntegrationFlow jmsAggFlow() {

        return IntegrationFlows
            .from("jms.agg.ch")
            .aggregate(a -> { 
                a.outputProcessor(g -> {
                    List<Message<?>> l = new ArrayList<Message<?>>(g.getMessages());

                    Message<?> firstMessage = l.get(0);
                    Message<?> lastMessage = (l.size() > 1) ? l.get(l.size() - 1) : firstMessage;

                    Message<?> messageOut = MessageBuilder.fromMessage(lastMessage)
                            .setHeader(MessageHeaders.REPLY_CHANNEL, (String) firstMessage.getHeaders().getReplyChannel())
                            .build();

                     return messageOut;
                }); 
                a.releaseStrategy(g -> g.size() == 2);
                a.groupTimeout(45000);
                a.sendPartialResultOnExpiry(false);
                a.discardChannel("jms.agg.timeout.ch");
            }, null)
            .channel("response.ch")
            .get();
        }
    }

    @Bean
    public IntegrationFlow jmsAggTimeoutFlow() {
        return IntegrationFlows
            .from("jms.agg.timeout.ch")
            .handle(Message.class, (m, h) -> new ErrorMessage(new MessageTimeoutException(m), h))
            .channel("error.ch")
            .get();
    }
}

Cheers, PM

Upvotes: 3

Views: 1131

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121272

H-m... Looks like. It is really a bug in the DslRecipientListRouter logic: https://github.com/spring-projects/spring-integration-java-dsl/issues/9 Will be fixed soon and released over a couple of days.

Thank you for pointing that out!

BTW. your logic isn't correct a bit: Even when we fix that RecipientListRouter, the second recipinet will receive the same request message only after JmsOutboundGateway will have received the reply, not just after request has been sent to the request-queue. It is blocked request-reply process. And there is no a hook to get a point between reqeust and reply in the JmsOutboundGateway.

Is that OK for you?

Upvotes: 1

Related Questions