sdolgy
sdolgy

Reputation: 7001

Retrieve multiple messages from SQS

I have multiple messages in SQS. The following code always returns only one, even if there are dozens visible (not in flight). setMaxNumberOfMessages I thought would allow multiple to be consumed at once .. have i misunderstood this?

 CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName);
 String queueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();
 ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
 receiveMessageRequest.setMaxNumberOfMessages(10);
 List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
 for (Message message : messages) {
      // i'm a message from SQS
 }

I've also tried using withMaxNumberOfMessages without any such luck:

 receiveMessageRequest.withMaxNumberOfMessages(10);

How do I know there are messages in the queue? More than 1?

 Set<String> attrs = new HashSet<String>();
 attrs.add("ApproximateNumberOfMessages");
 CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName);
 GetQueueAttributesRequest a = new GetQueueAttributesRequest().withQueueUrl(sqs.createQueue(createQueueRequest).getQueueUrl()).withAttributeNames(attrs);
 Map<String,String> result = sqs.getQueueAttributes(a).getAttributes();
 int num = Integer.parseInt(result.get("ApproximateNumberOfMessages"));

The above always is run prior and gives me an int that is >1

Thanks for your input

Upvotes: 35

Views: 48351

Answers (8)

Amit
Amit

Reputation: 51

Here's a workaround, you can call receiveMessageFromSQS method asynchronously.

   bulkReceiveFromSQS (queueUrl, totalMessages, asyncLimit, batchSize, visibilityTimeout, waitTime, callback) {
    batchSize = Math.min(batchSize, 10);

    let self = this,
        noOfIterations = Math.ceil(totalMessages / batchSize);

    async.timesLimit(noOfIterations, asyncLimit, function(n, next) {
        self.receiveMessageFromSQS(queueUrl, batchSize, visibilityTimeout, waitTime,
            function(err, result) {
                if (err) {
                    return next(err);
                }

                return next(null, _.get(result, 'Messages'));
            });
    }, function (err, listOfMessages) {
        if (err) {
            return callback(err);
        }
        listOfMessages = _.flatten(listOfMessages).filter(Boolean);

        return callback(null, listOfMessages);
    });
}

It will return you an array with a given number of messages

Upvotes: 0

Fduch
Fduch

Reputation: 11

For small task list I use FIFO queue like stackoverflow.com/a/55149351/13678017 for example modified AWS tutorial

            // Create a queue.
        System.out.println("Creating a new Amazon SQS FIFO queue called " + "MyFifoQueue.fifo.\n");
        final Map<String, String> attributes = new HashMap<>();

        // A FIFO queue must have the FifoQueue attribute set to true.
        attributes.put("FifoQueue", "true");
        /*
         * If the user doesn't provide a MessageDeduplicationId, generate a
         * MessageDeduplicationId based on the content.
         */
        attributes.put("ContentBasedDeduplication", "true");
        // The FIFO queue name must end with the .fifo suffix.
        final CreateQueueRequest createQueueRequest = new CreateQueueRequest("MyFifoQueue4.fifo")
                        .withAttributes(attributes);
        final String myQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();

        // List all queues.
        System.out.println("Listing all queues in your account.\n");
        for (final String queueUrl : sqs.listQueues().getQueueUrls()) {
            System.out.println("  QueueUrl: " + queueUrl);
        }
        System.out.println();

        // Send a message.
        System.out.println("Sending a message to MyQueue.\n");

        for (int i = 0; i < 4; i++) {

            var request = new SendMessageRequest()
                    .withQueueUrl(myQueueUrl)
                    .withMessageBody("message " + i)
                    .withMessageGroupId("userId1");
                    ;

            sqs.sendMessage(request);
        }

        for (int i = 0; i < 6; i++) {

            var request = new SendMessageRequest()
                    .withQueueUrl(myQueueUrl)
                    .withMessageBody("message " + i)
                    .withMessageGroupId("userId2");
                    ;

            sqs.sendMessage(request);
        }

        // Receive messages.
        System.out.println("Receiving messages from MyQueue.\n");
        var receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);

        receiveMessageRequest.setMaxNumberOfMessages(10);
        receiveMessageRequest.setWaitTimeSeconds(20);

        // what receive?
        receiveMessageRequest.withMessageAttributeNames("userId2");


        final List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
        for (final Message message : messages) {
            System.out.println("Message");
            System.out.println("  MessageId:     "
                    + message.getMessageId());
            System.out.println("  ReceiptHandle: "
                    + message.getReceiptHandle());
            System.out.println("  MD5OfBody:     "
                    + message.getMD5OfBody());
            System.out.println("  Body:          "
                    + message.getBody());
            for (final Entry<String, String> entry : message.getAttributes()
                    .entrySet()) {
                System.out.println("Attribute");
                System.out.println("  Name:  " + entry
                        .getKey());
                System.out.println("  Value: " + entry
                        .getValue());
            }
        }

Upvotes: 1

muyong
muyong

Reputation: 219

Thanks Caoilte!

I faced this issue also. Finally solved by using long polling follow the configuration here: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-long-polling-for-queue.html

Unfortunately, to use long polling, you must create your queue as FIFO one. I tried standard queue with no luck.

And when receiving, need also set MaxNumberOfMessages. So my code is like:

ReceiveMessageRequest receive_request = new ReceiveMessageRequest() .withQueueUrl(QUEUE_URL) .withWaitTimeSeconds(20) .withMaxNumberOfMessages(10);

Although solved, still feel too wired. AWS should definitely provide a more neat API for this kind of basic receiving operation.

From my point, AWS has many many cool features but not good APIs. Like those guys are rushing out all the time.

Upvotes: 0

niranjan
niranjan

Reputation: 159

I was just trying the same and with the help of these two attributes setMaxNumberOfMessages and setWaitTimeSeconds i was able to get 10 messages.

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
                      receiveMessageRequest.setMaxNumberOfMessages(10);
                      receiveMessageRequest.setWaitTimeSeconds(20);

Snapshot of o/p:

Receiving messages from TestQueue.
Number of messages:10
Message
MessageId:     31a7c669-1f0c-4bf1-b18b-c7fa31f4e82d 
...

Upvotes: 5

Caoilte
Caoilte

Reputation: 2401

There is a comprehensive explanation for this (arguably rather idiosyncratic) behaviour in the SQS reference documentation.

SQS stores copies of messages on multiple servers and receive message requests are made to these servers with one of two possible strategies,

  • Short Polling : The default behaviour, only a subset of the servers (based on a weighted random distribution) are queried.
  • Long Polling : Enabled by setting the WaitTimeSeconds attribute to a non-zero value, all of the servers are queried.

In practice, for my limited tests, I always seem to get one message with short polling just as you did.

Upvotes: 11

wisbucky
wisbucky

Reputation: 37797

I had the same problem. What is your Receive Message Wait Time for your queue set to? When mine was at 0, it only returned 1 message even if there were 8 in the queue. When I increased the Receive Message Wait Time, then I got all of them. Seems kind of buggy to me.

Upvotes: 6

Marcus
Marcus

Reputation: 2238

receiveMessageRequest.withMaxNumberOfMessages(10);

Just to be clear, the more practical use of this would be to add to your constructor like this:

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10);

Otherwise, you might as well just do:

receiveMessageRequest.setMaxNumberOfMessages(10);

That being said, changing this won't help the original problem.

Upvotes: 1

sdolgy
sdolgy

Reputation: 7001

AWS API Reference Guide: Query/QueryReceiveMessage

Due to the distributed nature of the queue, a weighted random set of machines is sampled on a ReceiveMessage call. That means only the messages on the sampled machines are returned. If the number of messages in the queue is small (less than 1000), it is likely you will get fewer messages than you requested per ReceiveMessage call. If the number of messages in the queue is extremely small, you might not receive any messages in a particular ReceiveMessage response; in which case you should repeat the request.

and

MaxNumberOfMessages: Maximum number of messages to return. SQS never returns more messages than this value but might return fewer.

Upvotes: 39

Related Questions