Reputation: 4879
I am using Spring's message-driven POJO framework (and DefaultMessageListenerContainer
in particular) to listen to several queues and topics.
In the case of one particularly queue, there is a need to slow the rate at which I drain the queue, on the order of one message every five minutes. The actual processing of the messages is a sub-second operation, but I would like the listener to sit idle for some time in between messages.
I have created a bit of a hack, but it is decidedly sub-optimal: What I've done is to set the max concurrency to 1 and add a Thread.sleep(..)
after processing each message. I would like to find a way instead to use the DefaultMessageListenerContainer
to wait between attempts to receive, rather than causing the handler to do the waiting during the would-be processing of a message.
I had considered if there was a ScheduledExecutor
that would help, but I realize that the throttling would need to be done where the tasks are produced. Is there perhaps some method from DefaultMessageListenerContainer
that I could override to accomplish what I'm after?
Upvotes: 12
Views: 4267
Reputation: 15909
Here's a solution that extends DefaultMessageListenerContainer
to provide the throttling functionality. The advantage of this approach is that Thread.sleep()
is not being called within onMessage()
. This would hold a Transaction open for longer than necessary if a Transaction is in play (as configured in this example below). The call to Thread.sleep() occurs after the transaction has been committed. A limitation to implementing this throttling feature is that we can only support one consumer thread, hence the name ThrottlingSingleConsumerMessageListenerContainer
.
@Configuration
@EnableJms
@EnableTransactionManagement
public class Config
{
private static final long THROTTLE_FIVE_SECONDS = 5_000;
@Bean
public DefaultMessageListenerContainer defaultMessageListenerContainer(
ConnectionFactory connectionFactory,
PlatformTransactionManager transactionManager,
MyJmsListener myJmsListner)
{
DefaultMessageListenerContainer dmlc = new ThrottlingSingleConsumerMessageListenerContainer(THROTTLE_FIVE_SECONDS);
dmlc.setConnectionFactory(connectionFactory);
dmlc.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
dmlc.setSessionTransacted(true);
dmlc.setTransactionManager(transactionManager);
dmlc.setDestinationName("QUEUE.IN");
dmlc.setMessageListener(myJmsListner);
return dmlc;
}
}
@Component
public class MyJmsListener implements MessageListener
{
@Override
public void onMessage(Message message)
{
// process the message
}
}
public class ThrottlingSingleConsumerMessageListenerContainer extends DefaultMessageListenerContainer
{
private static final Logger log = LoggerFactory.getLogger(ThrottlingSingleConsumerMessageListenerContainer.class);
private final long delayMillis;
public ThrottlingSingleConsumerMessageListenerContainer(long delayMillis)
{
this.delayMillis = delayMillis;
super.setMaxConcurrentConsumers(1);
}
@Override
protected boolean receiveAndExecute(Object invoker, @Nullable Session session, @Nullable MessageConsumer consumer) throws JMSException
{
boolean messageReceived = super.receiveAndExecute(invoker, session, consumer);
if (messageReceived) {
log.info("Sleeping for {} millis", delayMillis);
try {
Thread.sleep(delayMillis);
} catch (InterruptedException e) {
log.warn("Sleeping thread has been interrupted");
Thread.currentThread().interrupt();
}
}
return messageReceived;
}
@Override
public void setMaxConcurrentConsumers(int maxConcurrentConsumers)
{
super.setMaxConcurrentConsumers(maxConcurrentConsumers);
Assert.isTrue(getMaxConcurrentConsumers() <= 1, "Throttling does not support maxConcurrentConsumers > 1");
}
@Override
public void setConcurrency(String concurrency)
{
super.setConcurrency(concurrency);
Assert.isTrue(getMaxConcurrentConsumers() <= 1, "Throttling does not support maxConcurrentConsumers > 1");
}
}
This has been tested on org.springframework 5.x but should run on earlier versions also.
Upvotes: 1
Reputation: 1965
Depending on the provider of the queue, you may be able to set a max rate for consumers that consume it's queues.
For example in hornetQ you set this in the connection factory using consumer-max-rate.
Upvotes: 4
Reputation: 2075
An alternative to modifying the behavior of your consumer would be to make use of Apache Camel to delay the messages on that one specific queue.
http://camel.apache.org/delayer.html describes the functionality of the Camel Delayer pattern. So for example:
<route>
<from uri="jms:YOURQUEUE"/>
<delay>
<constant>1000</constant>
</delay>
<to uri="jms:DELAYEDQUEUE"/>
</route>
Where you would then consume the DELAYEDQUEUE and all messages would be delayed by 1 second.
Upvotes: 2