Sanal M
Sanal M

Reputation: 359

Reading messages from rabbitMQ queue at an interval is not working

What I am trying to achieve is to read messages from a RabbitMQ queue every 15 minutes. From the documentation, I could see that I can use the "receiveTimeout" method to set the interval.

Polling Consumer

The AmqpTemplate itself can be used for polled Message reception. By default, if no message is available, null is returned immediately. There is no blocking. Starting with version 1.5, you can set a receiveTimeout, in milliseconds, and the receive methods block for up to that long, waiting for a message.

But I tried implementing it with sprint integration, the receiveTimeout is not working as I expected.

My test code is given below.

@Bean
    Queue createMessageQueue() {
        return new Queue(RetryQueue, false);
    }

    @Bean
    public SimpleMessageListenerContainer QueueMessageListenerContainer(ConnectionFactory connectionFactory) {
        final SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(
                connectionFactory);
        messageListenerContainer.setQueueNames(RetryQueue);
        messageListenerContainer.setReceiveTimeout(900000);
        return messageListenerContainer;
    }

    @Bean
    public AmqpInboundChannelAdapter inboundQueueChannelAdapter(
            @Qualifier("QueueMessageListenerContainer") AbstractMessageListenerContainer messageListenerContainer) {
        final AmqpInboundChannelAdapter amqpInboundChannelAdapter = new AmqpInboundChannelAdapter(
                messageListenerContainer);
        amqpInboundChannelAdapter.setOutputChannelName("channelRequestFromQueue");
        return amqpInboundChannelAdapter;
    }

    @ServiceActivator(inputChannel = "channelRequestFromQueue")
    public void activatorRequestFromQueue(Message<String> message) {
        System.out.println("Message: " + message.getPayload() + ", recieved at: " + LocalDateTime.now());
    }

I am getting the payload logged in the console in near real-time. Can anyone help? How much time the consumer will be active once it starts?

UPDATE

IntegrationFlow I used to retrieve messages from queue at an interval,

@Bean
    public IntegrationFlow inboundIntegrationFlowPaymentRetry() {
        return IntegrationFlows
                .from(Amqp.inboundPolledAdapter(connectionFactory, RetryQueue),
                        e -> e.poller(Pollers.fixedDelay(20_000).maxMessagesPerPoll(-1)).autoStartup(true))
                .handle(message -> {
                    channelRequestFromQueue()
                            .send(MessageBuilder.withPayload(message.getPayload()).copyHeaders(message.getHeaders())
                                    .setHeader(IntegrationConstants.QUEUED_MESSAGE, message).build());
                }).get();
    }

Upvotes: 0

Views: 1241

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

The Polling Consumer documentation is from the Spring AMQP documentation about the `RabbitTemplate, and has nothing to do with the listener container, or Spring Integration.

https://docs.spring.io/spring-amqp/docs/current/reference/html/#polling-consumer

Spring integration's adapter is message-driven and you will get messages whenever they are available.

To get messages on-demand, you need to call the RabbitTemplate on whatever interval you want.

Upvotes: 1

Related Questions