Reputation: 3326
We're using Spring Cloud AWS to interact with SQS. We use the @SqsListener
annotation to pull messages off our queues. We have deletionPolicy = NEVER
, which means we manually acknowledge all messages we pick off.
Our problem is that the SimpleMessageListenerContainer
(which handles the processing of messages from a queue) waits for all worker threads to finish before picking further messages off the queue.
In other words, what we're seeing is this:
We can see the code in SimpleMessageListenerContainer.AsynchronousMessageListener
which is responsible for this
@Override
public void run() {
while (isQueueRunning()) {
try {
ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
for (Message message : receiveMessageResult.getMessages()) {
if (isQueueRunning()) {
MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
} else {
messageBatchLatch.countDown();
}
}
try {
messageBatchLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} catch (Exception e) {
getLogger().warn("An Exception occurred while polling queue '{}'. The failing operation will be " +
"retried in {} milliseconds", this.logicalQueueName, getBackOffTime(), e);
try {
//noinspection BusyWait
Thread.sleep(getBackOffTime());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
}
Ideally, we'd like for the message listener to continually pick messages off the queue for processing.
We can't seem to implement our own MessageListenerContainer
since the AbstractMessageListenerContainer
is package local.
Is there any way around this behavior?
Upvotes: 3
Views: 3456
Reputation: 2902
What is holding the message polling thread is the messageBatchLatch.await()
statement. It seems that just removing the latch would do it. Something like:
@Override
public void run() {
while (isQueueRunning()) {
try {
ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
for (Message message : receiveMessageResult.getMessages()) {
if (isQueueRunning()) {
MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
getTaskExecutor().execute(new SignalExecutingRunnable(messageExecutor));
}
}
} catch (Exception e) {
getLogger().warn("An Exception occurred while polling queue '{}'. The failing operation will be " +
"retried in {} milliseconds", this.logicalQueueName, getBackOffTime(), e);
try {
//noinspection BusyWait
Thread.sleep(getBackOffTime());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
}
This will work if your TaskExecutor
implementation:
- Has a fixed-size thread pool
- Blocks when execute
function gets called and no threads are available.
This is how most of implementations work but it is worth checking yours.
Upvotes: 0