Ray
Ray

Reputation: 4879

How can I control the rate at which Spring receives from a queue?

I am using Spring's message-driven POJO framework (and DefaultMessageListenerContainer in particular) to listen to several queues and topics.

In the case of one particularly queue, there is a need to slow the rate at which I drain the queue, on the order of one message every five minutes. The actual processing of the messages is a sub-second operation, but I would like the listener to sit idle for some time in between messages.

I have created a bit of a hack, but it is decidedly sub-optimal: What I've done is to set the max concurrency to 1 and add a Thread.sleep(..) after processing each message. I would like to find a way instead to use the DefaultMessageListenerContainer to wait between attempts to receive, rather than causing the handler to do the waiting during the would-be processing of a message.

I had considered if there was a ScheduledExecutor that would help, but I realize that the throttling would need to be done where the tasks are produced. Is there perhaps some method from DefaultMessageListenerContainer that I could override to accomplish what I'm after?

Upvotes: 12

Views: 4267

Answers (3)

Brad
Brad

Reputation: 15909

Here's a solution that extends DefaultMessageListenerContainer to provide the throttling functionality. The advantage of this approach is that Thread.sleep() is not being called within onMessage(). This would hold a Transaction open for longer than necessary if a Transaction is in play (as configured in this example below). The call to Thread.sleep() occurs after the transaction has been committed. A limitation to implementing this throttling feature is that we can only support one consumer thread, hence the name ThrottlingSingleConsumerMessageListenerContainer.

@Configuration
@EnableJms
@EnableTransactionManagement
public class Config
{
    private static final long THROTTLE_FIVE_SECONDS = 5_000;

    @Bean
    public DefaultMessageListenerContainer defaultMessageListenerContainer(
            ConnectionFactory connectionFactory,
            PlatformTransactionManager transactionManager,
            MyJmsListener myJmsListner)
    {
        DefaultMessageListenerContainer dmlc = new ThrottlingSingleConsumerMessageListenerContainer(THROTTLE_FIVE_SECONDS);
        dmlc.setConnectionFactory(connectionFactory);
        dmlc.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        dmlc.setSessionTransacted(true);
        dmlc.setTransactionManager(transactionManager);
        dmlc.setDestinationName("QUEUE.IN");
        dmlc.setMessageListener(myJmsListner);
        return dmlc;
    }
}

@Component
public class MyJmsListener implements MessageListener
{
    @Override
    public void onMessage(Message message)
    {
        // process the message
    }
}

public class ThrottlingSingleConsumerMessageListenerContainer extends DefaultMessageListenerContainer
{
    private static final Logger log = LoggerFactory.getLogger(ThrottlingSingleConsumerMessageListenerContainer.class);

    private final long delayMillis;

    public ThrottlingSingleConsumerMessageListenerContainer(long delayMillis)
    {
        this.delayMillis = delayMillis;
        super.setMaxConcurrentConsumers(1);
    }

    @Override
    protected boolean receiveAndExecute(Object invoker, @Nullable Session session, @Nullable MessageConsumer consumer) throws JMSException
    {
        boolean messageReceived = super.receiveAndExecute(invoker, session, consumer);

        if (messageReceived) {
            log.info("Sleeping for {} millis", delayMillis);
            try {
                Thread.sleep(delayMillis);
            } catch (InterruptedException e) {
                log.warn("Sleeping thread has been interrupted");
                Thread.currentThread().interrupt();
            }
        }

        return messageReceived;
    }

    @Override
    public void setMaxConcurrentConsumers(int maxConcurrentConsumers)
    {
        super.setMaxConcurrentConsumers(maxConcurrentConsumers);
        Assert.isTrue(getMaxConcurrentConsumers() <= 1,  "Throttling does not support maxConcurrentConsumers > 1");
    }

    @Override
    public void setConcurrency(String concurrency)
    {
        super.setConcurrency(concurrency);
        Assert.isTrue(getMaxConcurrentConsumers() <= 1,  "Throttling does not support maxConcurrentConsumers > 1");
    }
}

This has been tested on org.springframework 5.x but should run on earlier versions also.

Upvotes: 1

andreadi
andreadi

Reputation: 1965

Depending on the provider of the queue, you may be able to set a max rate for consumers that consume it's queues.

For example in hornetQ you set this in the connection factory using consumer-max-rate.

Upvotes: 4

Matt Aldridge
Matt Aldridge

Reputation: 2075

An alternative to modifying the behavior of your consumer would be to make use of Apache Camel to delay the messages on that one specific queue.

http://camel.apache.org/delayer.html describes the functionality of the Camel Delayer pattern. So for example:

<route>
    <from uri="jms:YOURQUEUE"/>
    <delay>
        <constant>1000</constant>
    </delay>
    <to uri="jms:DELAYEDQUEUE"/>
</route>

Where you would then consume the DELAYEDQUEUE and all messages would be delayed by 1 second.

Upvotes: 2

Related Questions