Reputation: 1451
I have a Spring Integration project where I send and receive messages from a RabbitMQ queue.
The order in which the system publishes messages is OK, but the order in which it afterwards receives messages is incorrect.
So I found this paragraph (https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-strict-ordering) and configured the listener with: simpleMessageListenerContainer.setPrefetchCount(1);
.
We had some tests and it functioned well. However, after a week or so it started to give similar ordering issues.
Let me explain a bit more:
I have two flows (IntegrationFlow
s) in one spring integration application.
In the first IntegrationFlow
it creates messages and publishes every message into a rabbit queue.
Just before the publishing it logs every message and I can confirm that the sequenceNumber
increments as expected (in my case 1,2,3,4,5,6,7,8,9,10,11).
Then in the second flow is consumes these published messages. Right after each message is received, the flow logs it again. Here I found out that the sequenceNumber
does not increment as expected (in my case 1,3,5,7,2,4,6,8,9,10,11).
It is very important for this application to handle messages in the right ordering.
When I looked into rabbit's UI I found out the following (most of them are what i expect):
I didn't expect 3 channels within my applications connection. I did not configure that myself, maybe Spring Integration / AMQP did that for me.
Now, I think that a another channel might become active and that this causes the ordering problem. But I cannot find this in the logging. And not in the configuration.
Pieces of code:
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(final ConnectionFactory connectionFactory,
final Jackson2JsonMessageConverter jackson2MessageConverter,
final MethodInterceptor retryInterceptor) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.setMessageConverter(jackson2MessageConverter);
simpleMessageListenerContainer.setAdviceChain(retryInterceptor);
// force FIFO ordering (https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-strict-ordering):
simpleMessageListenerContainer.setPrefetchCount(1);
simpleMessageListenerContainer.setConcurrency();
return simpleMessageListenerContainer;
}
@Bean
public IntegrationFlow routeIncomingAmqpMessagesFlow(final SimpleMessageListenerContainer simpleMessageListenerContainer,
final Queue q1, final Queue q2, final Queue q3,
final Queue q4, final Queue q5,
final Queue q6) {
simpleMessageListenerContainer.setQueues(q1, q2, q3, q4, q5, q6);
return IntegrationFlows.from(
Amqp.inboundAdapter(simpleMessageListenerContainer)
.messageConverter(jackson2MessageConverter))
.log(LoggingHandler.Level.DEBUG, "com.my.thing")
.headerFilter(MyMessageHeaders.QUEUE_ROUTING_KEY)
.route(router())
.get();
}
private HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter(AmqpHeaders.CONSUMER_QUEUE);
router.setChannelMapping(AmqpConfiguration.Q1_NAME, Q1_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q2_NAME, Q2_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q3_NAME, Q3_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q4_NAME, Q4_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q5_NAME, Q5_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q6_NAME, Q6_CHANNEL);
router.setResolutionRequired(false);
router.setDefaultOutputChannelName("errorChannel");
return router;
}
publish:
@Bean
public IntegrationFlow prepareForUpload(final Handler1 handler1) {
BinaryFileSplitter binaryFileSplitter = new BinaryFileSplitter(true);
binaryFileSplitter.setChunkSize(chunksize);
return IntegrationFlows
.from(aFlow)
.handle(handler1)
.split(binaryFileSplitter)
.log(LoggingHandler.Level.TRACE, "com.my.log.identifyer")
// Send message to the correct AMQP queue after successful processing
.enrichHeaders(h -> h.header(QUEUE_ROUTING_KEY, AmqpConfiguration.Q4_NAME))
.channel(MyChannels.AMQP_OUTPUT)
.get();
}
@Bean
public IntegrationFlow outputAmqpFlow(final AmqpTemplate amqpTemplate, final UpdateDb updateDb) {
return IntegrationFlows.from(MyChannels.AMQP_OUTPUT)
.log(LoggingHandler.Level.DEBUG, "com.my.log.identify")
.handle(updateDb)
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName(AmqpConfiguration.THE_TOPIC_EXCHANGE)
.routingKeyExpression("headers['queueRoutingKey']"))
.get();
}
receive:
@Bean
public IntegrationFlow handleReceivedMessages() {
return IntegrationFlows
.from(Q4_CHANNEL)
.log(LoggingHandler.Level.DEBUG, "com.my.log.identifyer")
.handle(..)
.aggregate(a -> a.releaseStrategy(new ChunkReleaseStrategy()))
.transform(..)
....(..)..
...
Upvotes: 0
Views: 677
Reputation: 174729
As discussed in the documentation you pointed to, you need to add a BoundRabbitChannelAdvice
to the splitter so that all the downstream flow uses the same channel.
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlows.from(Gate.class)
.split(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template)))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
Upvotes: 0