codesmith
codesmith

Reputation: 1451

How to enforce strict ordering for a Rabbit MQ message listener in Spring Integration?

I have a Spring Integration project where I send and receive messages from a RabbitMQ queue.

The order in which the system publishes messages is OK, but the order in which it afterwards receives messages is incorrect.

So I found this paragraph (https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-strict-ordering) and configured the listener with: simpleMessageListenerContainer.setPrefetchCount(1);.

We had some tests and it functioned well. However, after a week or so it started to give similar ordering issues.

Let me explain a bit more:

I have two flows (IntegrationFlows) in one spring integration application.

In the first IntegrationFlow it creates messages and publishes every message into a rabbit queue.

Just before the publishing it logs every message and I can confirm that the sequenceNumber increments as expected (in my case 1,2,3,4,5,6,7,8,9,10,11).

Then in the second flow is consumes these published messages. Right after each message is received, the flow logs it again. Here I found out that the sequenceNumber does not increment as expected (in my case 1,3,5,7,2,4,6,8,9,10,11).

It is very important for this application to handle messages in the right ordering.

When I looked into rabbit's UI I found out the following (most of them are what i expect):

I didn't expect 3 channels within my applications connection. I did not configure that myself, maybe Spring Integration / AMQP did that for me.

Now, I think that a another channel might become active and that this causes the ordering problem. But I cannot find this in the logging. And not in the configuration.

Pieces of code:

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(final ConnectionFactory connectionFactory,
                                                                         final Jackson2JsonMessageConverter jackson2MessageConverter,
                                                                         final MethodInterceptor retryInterceptor) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        simpleMessageListenerContainer.setMessageConverter(jackson2MessageConverter);
        simpleMessageListenerContainer.setAdviceChain(retryInterceptor);
        // force FIFO ordering (https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-strict-ordering):
        simpleMessageListenerContainer.setPrefetchCount(1);
        simpleMessageListenerContainer.setConcurrency();
        return simpleMessageListenerContainer;
    }

    @Bean
    public IntegrationFlow routeIncomingAmqpMessagesFlow(final SimpleMessageListenerContainer simpleMessageListenerContainer,
                                                         final Queue q1, final Queue q2, final Queue q3,
                                                         final Queue q4, final Queue q5,
                                                         final Queue q6) {
        simpleMessageListenerContainer.setQueues(q1, q2, q3, q4, q5, q6);
        return IntegrationFlows.from(
                Amqp.inboundAdapter(simpleMessageListenerContainer)
                        .messageConverter(jackson2MessageConverter))
                .log(LoggingHandler.Level.DEBUG, "com.my.thing")
                .headerFilter(MyMessageHeaders.QUEUE_ROUTING_KEY)
                .route(router())
                .get();
    }

    private HeaderValueRouter router() {
        HeaderValueRouter router = new HeaderValueRouter(AmqpHeaders.CONSUMER_QUEUE);
        router.setChannelMapping(AmqpConfiguration.Q1_NAME, Q1_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q2_NAME, Q2_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q3_NAME, Q3_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q4_NAME, Q4_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q5_NAME, Q5_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q6_NAME, Q6_CHANNEL);
        router.setResolutionRequired(false);
        router.setDefaultOutputChannelName("errorChannel");
        return router;
    }

publish:

    @Bean
    public IntegrationFlow prepareForUpload(final Handler1 handler1) {
        BinaryFileSplitter binaryFileSplitter = new BinaryFileSplitter(true);
        binaryFileSplitter.setChunkSize(chunksize);

        return IntegrationFlows
                .from(aFlow)
                .handle(handler1)
                .split(binaryFileSplitter)
                .log(LoggingHandler.Level.TRACE, "com.my.log.identifyer")
                // Send message to the correct AMQP queue after successful processing
                .enrichHeaders(h -> h.header(QUEUE_ROUTING_KEY, AmqpConfiguration.Q4_NAME))
                .channel(MyChannels.AMQP_OUTPUT)
                .get();
    }
    @Bean
    public IntegrationFlow outputAmqpFlow(final AmqpTemplate amqpTemplate, final UpdateDb updateDb) {
        return IntegrationFlows.from(MyChannels.AMQP_OUTPUT)
                .log(LoggingHandler.Level.DEBUG, "com.my.log.identify")
                .handle(updateDb)
                .handle(Amqp.outboundAdapter(amqpTemplate)
                        .exchangeName(AmqpConfiguration.THE_TOPIC_EXCHANGE)
                        .routingKeyExpression("headers['queueRoutingKey']"))
                .get();
    }

receive:

    @Bean
    public IntegrationFlow handleReceivedMessages() {
        return IntegrationFlows
                .from(Q4_CHANNEL)
                .log(LoggingHandler.Level.DEBUG, "com.my.log.identifyer")
                .handle(..)
                .aggregate(a -> a.releaseStrategy(new ChunkReleaseStrategy()))
                .transform(..)
                ....(..)..
                ...

Upvotes: 0

Views: 677

Answers (1)

Gary Russell
Gary Russell

Reputation: 174729

As discussed in the documentation you pointed to, you need to add a BoundRabbitChannelAdvice to the splitter so that all the downstream flow uses the same channel.

        @Bean
        public IntegrationFlow flow(RabbitTemplate template) {
            return IntegrationFlows.from(Gate.class)
                    .split(s -> s.delimiters(",")
                            .advice(new BoundRabbitChannelAdvice(template)))
                    .<String, String>transform(String::toUpperCase)
                    .handle(Amqp.outboundAdapter(template).routingKey("rk"))
                    .get();
        }

Upvotes: 0

Related Questions