stdunbar
stdunbar

Reputation: 17435

Reading from an SQS queue with multiple threads

Note: this is a repost of a thread from here.

Hi all, I've got a process that processes messages in a single SQS queue. The queue can have many message in it and each message results in a database hit. Therefore I wanted to thread the readers of this queue.

The basic code for each thread is:

public void run() {
    while(true) {
        ReceiveMessageRequest rmr = new ReceiveMessageRequest(queueUrl)
               .withMaxNumberOfMessages(10)
               .withWaitTimeSeconds(3);
        List<Message> messages = sqsClient.receiveMessage(rmr).getMessages();
        // process messages
        // delete messages
    }
}

What I'm seeing is that there are tons of duplicated messages between the threads. I know that I should expect a few duplicates here and there but it appears that each thread gets the same set of messages and, realistically, only one thread ever does much work.

Am I misunderstanding how to use the API or am I doing something else wrong? The Javadocs indicate that the AmazonSQS class is threadsafe and, indeed, even creating a new AmazonSQS class for each thread changed nothing.

Any pointers would be most appreciated. My current thought of a fix is to have a single thread reading from the SQS queue, putting each message into something like a LinkedBlockingDeque and then have the workers reading that. But I feel that that implementation will not drain the queue as fast as I'd like.

Upvotes: 9

Views: 14265

Answers (2)

Manjay_TBAG
Manjay_TBAG

Reputation: 2245

This will ensure duplicate message, you can skip duplicate message using this code

 var receivedCounts = message.Attributes.Where(x => x.Key == "ApproximateReceiveCount").FirstOrDefault();
                            if (message.Attributes != null && message.Attributes.Count > 0 &&
                                message.Attributes.Any(x => x.Key == "ApproximateReceiveCount") && receivedCounts.Value != "1")
                            {
                                AppLogger.LogError("Duplicate message arrived");
                                listDeleteMessageBatchRequestEntry.Add(new DeleteMessageBatchRequestEntry()
                                {
                                    Id = message.MessageId,
                                    ReceiptHandle = message.ReceiptHandle
                                });
                                continue;
                            }

Upvotes: -1

jbarrameda
jbarrameda

Reputation: 1997

As you have a database hit for each message, it seems that processing each message takes time. You should increase the visibility timeout of the queue.

From AWS SQS documentation:

Immediately after the message is received, it remains in the queue. To prevent other consumers from processing the message again, Amazon SQS sets a visibility timeout, a period of time during which Amazon SQS prevents other consuming components from receiving and processing the message.

(http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html)

Upvotes: 5

Related Questions