Reputation: 17435
Note: this is a repost of a thread from here.
Hi all, I've got a process that processes messages in a single SQS queue. The queue can have many message in it and each message results in a database hit. Therefore I wanted to thread the readers of this queue.
The basic code for each thread is:
public void run() {
while(true) {
ReceiveMessageRequest rmr = new ReceiveMessageRequest(queueUrl)
.withMaxNumberOfMessages(10)
.withWaitTimeSeconds(3);
List<Message> messages = sqsClient.receiveMessage(rmr).getMessages();
// process messages
// delete messages
}
}
What I'm seeing is that there are tons of duplicated messages between the threads. I know that I should expect a few duplicates here and there but it appears that each thread gets the same set of messages and, realistically, only one thread ever does much work.
Am I misunderstanding how to use the API or am I doing something else wrong? The Javadocs indicate that the AmazonSQS class is threadsafe and, indeed, even creating a new AmazonSQS class for each thread changed nothing.
Any pointers would be most appreciated. My current thought of a fix is to have a single thread reading from the SQS queue, putting each message into something like a LinkedBlockingDeque and then have the workers reading that. But I feel that that implementation will not drain the queue as fast as I'd like.
Upvotes: 9
Views: 14265
Reputation: 2245
This will ensure duplicate message, you can skip duplicate message using this code
var receivedCounts = message.Attributes.Where(x => x.Key == "ApproximateReceiveCount").FirstOrDefault();
if (message.Attributes != null && message.Attributes.Count > 0 &&
message.Attributes.Any(x => x.Key == "ApproximateReceiveCount") && receivedCounts.Value != "1")
{
AppLogger.LogError("Duplicate message arrived");
listDeleteMessageBatchRequestEntry.Add(new DeleteMessageBatchRequestEntry()
{
Id = message.MessageId,
ReceiptHandle = message.ReceiptHandle
});
continue;
}
Upvotes: -1
Reputation: 1997
As you have a database hit for each message, it seems that processing each message takes time. You should increase the visibility timeout of the queue.
From AWS SQS documentation:
Immediately after the message is received, it remains in the queue. To prevent other consumers from processing the message again, Amazon SQS sets a visibility timeout, a period of time during which Amazon SQS prevents other consuming components from receiving and processing the message.
Upvotes: 5