Reputation: 2537
I am using a JMS Outbound Gateway
to send messages to a request queue and receive messages from a separate response queue. I would like to add functionality so that a call is made to a specific bean's method once a message has been successfully sent to the request queue.
I am using spring-integration 4.0.4
APIs and spring-integration-java-dsl 1.0.0
APIs for this and, I have so far been able to achieve the above functionality as follows:
@Configuration
@EnableIntegration
public class IntegrationConfig {
...
@Bean
public IntegrationFlow requestFlow() {
return IntegrationFlows
.from("request.ch")
.routeToRecipients(r ->
r.ignoreSendFailures(false)
.recipient("request.ch.1", "true")
.recipient("request.ch.2", "true"))
.get();
}
@Bean
public IntegrationFlow sendReceiveFlow() {
return IntegrationFlows
.from("request.ch.1")
.handle(Jms.outboundGateway(cachingConnectionFactory)
.receiveTimeout(45000)
.requestDestination("REQUEST_QUEUE")
.replyDestination("RESPONSE_QUEUE")
.correlationKey("JMSCorrelationID"), e -> e.requiresReply(true))
.channel("response.ch").get();
}
@Bean
public IntegrationFlow postSendFlow() {
return IntegrationFlows
.from("request.ch.2")
.handle("requestSentService", "fireRequestSuccessfullySentEvent")
.get();
}
...
}
Now, although the above configuration works, I have noticed that the only apparent reason request.ch.1
is called before request.ch.2
seems to be because of the channel names' alphabetical order and not because of the order in which they where added to the RecipientListRouter
itself. Is this correct? Or am I missing something here?
* EDIT below shows solution using Aggregator between JMS Outbound/Inbound Adapters approach (without Messaging Gateway) *
Integration Config:
@Configuration
@EnableIntegration
public class IntegrationConfig {
...
@Bean
public IntegrationFlow reqFlow() {
return IntegrationFlows
.from("request.ch")
.enrichHeaders(e -> e.headerChannelsToString())
.enrichHeaders(e -> e.headerExpression(IntegrationMessageHeaderAccessor.CORRELATION_ID, "headers['" + MessageHeaders.REPLY_CHANNEL + "']"))
.routeToRecipients(r -> {
r.ignoreSendFailures(false);
r.recipient("jms.req.ch", "true");
r.recipient("jms.agg.ch", "true");
})
.get();
}
@Bean
public IntegrationFlow jmsReqFlow() {
return IntegrationFlows
.from("jms.req.ch")
.handle(Jms.outboundAdapter(cachingConnectionFactory)
.destination("TEST_REQUEST_CH")).get();
}
@Bean
public IntegrationFlow jmsPostReqFlow() {
return IntegrationFlows
.from("jms.req.ch")
.handle("postSendService", "postSendProcess")
.get();
}
@Bean
public IntegrationFlow jmsResFlow() {
return IntegrationFlows
.from(Jms.inboundAdapter(cachingConnectionFactory).destination(
"TEST_RESPONSE_CH"),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(10)))
.channel("jms.agg.ch").get();
}
@Bean
public IntegrationFlow jmsAggFlow() {
return IntegrationFlows
.from("jms.agg.ch")
.aggregate(a -> {
a.outputProcessor(g -> {
List<Message<?>> l = new ArrayList<Message<?>>(g.getMessages());
Message<?> firstMessage = l.get(0);
Message<?> lastMessage = (l.size() > 1) ? l.get(l.size() - 1) : firstMessage;
Message<?> messageOut = MessageBuilder.fromMessage(lastMessage)
.setHeader(MessageHeaders.REPLY_CHANNEL, (String) firstMessage.getHeaders().getReplyChannel())
.build();
return messageOut;
});
a.releaseStrategy(g -> g.size() == 2);
a.groupTimeout(45000);
a.sendPartialResultOnExpiry(false);
a.discardChannel("jms.agg.timeout.ch");
}, null)
.channel("response.ch")
.get();
}
}
@Bean
public IntegrationFlow jmsAggTimeoutFlow() {
return IntegrationFlows
.from("jms.agg.timeout.ch")
.handle(Message.class, (m, h) -> new ErrorMessage(new MessageTimeoutException(m), h))
.channel("error.ch")
.get();
}
}
Cheers, PM
Upvotes: 3
Views: 1131
Reputation: 121272
H-m... Looks like. It is really a bug in the DslRecipientListRouter
logic: https://github.com/spring-projects/spring-integration-java-dsl/issues/9
Will be fixed soon and released over a couple of days.
Thank you for pointing that out!
BTW. your logic isn't correct a bit: Even when we fix that RecipientListRouter
, the second recipinet will receive the same request message only after JmsOutboundGateway
will have received the reply
, not just after request has been sent to the request-queue.
It is blocked request-reply process. And there is no a hook to get a point between reqeust and reply in the JmsOutboundGateway
.
Is that OK for you?
Upvotes: 1