GBC
GBC

Reputation: 3326

Spring Cloud SQS consumption blocking until all messages processed

We're using Spring Cloud AWS to interact with SQS. We use the @SqsListener annotation to pull messages off our queues. We have deletionPolicy = NEVER, which means we manually acknowledge all messages we pick off.

Our problem is that the SimpleMessageListenerContainer (which handles the processing of messages from a queue) waits for all worker threads to finish before picking further messages off the queue.

In other words, what we're seeing is this:

We can see the code in SimpleMessageListenerContainer.AsynchronousMessageListener which is responsible for this

@Override
public void run() {
    while (isQueueRunning()) {
        try {
            ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
            CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
            for (Message message : receiveMessageResult.getMessages()) {
                if (isQueueRunning()) {
                    MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
                    getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
                } else {
                    messageBatchLatch.countDown();
                }
            }
            try {
                messageBatchLatch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        } catch (Exception e) {
            getLogger().warn("An Exception occurred while polling queue '{}'. The failing operation will be " +
                    "retried in {} milliseconds", this.logicalQueueName, getBackOffTime(), e);
            try {
                //noinspection BusyWait
                Thread.sleep(getBackOffTime());
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

Ideally, we'd like for the message listener to continually pick messages off the queue for processing.

We can't seem to implement our own MessageListenerContainer since the AbstractMessageListenerContainer is package local.

Is there any way around this behavior?

Upvotes: 3

Views: 3456

Answers (1)

Filipe Felisbino
Filipe Felisbino

Reputation: 2902

What is holding the message polling thread is the messageBatchLatch.await() statement. It seems that just removing the latch would do it. Something like:

@Override
public void run() {
    while (isQueueRunning()) {
        try {
            ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
            for (Message message : receiveMessageResult.getMessages()) {
                if (isQueueRunning()) {
                    MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
                    getTaskExecutor().execute(new SignalExecutingRunnable(messageExecutor));
                }
            }
        } catch (Exception e) {
            getLogger().warn("An Exception occurred while polling queue '{}'. The failing operation will be " +
                    "retried in {} milliseconds", this.logicalQueueName, getBackOffTime(), e);
            try {
                //noinspection BusyWait
                Thread.sleep(getBackOffTime());
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

This will work if your TaskExecutor implementation: - Has a fixed-size thread pool - Blocks when execute function gets called and no threads are available.

This is how most of implementations work but it is worth checking yours.

Upvotes: 0

Related Questions