Deepak
Deepak

Reputation: 163

Consuming messages in batches in amazon sqs, alpine sqs spring boot

I had configured SQS listener to consume messages in List of Messages but I am only getting a single message at a time and getting error as cannot convert model.StudentData to the instance of java.util.ArrayList<com.amazonaws.services.sqs.model.Message>

my code is :-

@SqsListener(value = "${queueName}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
    public void receiveMessage(final StudentData studentData,
                               @Header("SenderId") final String senderId, final Acknowledgment acknowledgment) {

        // business logic
        acknowledgment.acknowledge();
    }

Any suggestion on how to configure sqs listener to consume multiple messages

any help will be appreciated

Upvotes: 4

Views: 6007

Answers (2)

Deepak
Deepak

Reputation: 163

solution for the above issue is :-

final ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
        while (true) {
            final String queueUrl = amazonSqs.getQueueUrl("enter your queue name").getQueueUrl();
            final var receiveMessageRequest = new ReceiveMessageRequest(queueUrl)
                    .withWaitTimeSeconds(20);

            List<Message> messages = amazonSqs.receiveMessage(receiveMessageRequest).getMessages();

            while (messages.size() > 0) {
                for (final Message queueMessage : messages) {
                    try {
                        String message = queueMessage.getBody();
                        amazonSqs.deleteMessage(new DeleteMessageRequest(queueUrl, queueMessage
                                .getReceiptHandle()));
                    } catch (Exception e) {
                        log.error("Received message with errors " + e);
                    }
                }
                messages = amazonSqs.receiveMessage(new ReceiveMessageRequest(queueUrl)).getMessages();
            }
        }
    });
        executorService.shutdown();

Upvotes: 3

Borislav Stoilov
Borislav Stoilov

Reputation: 3677

The SQS listener annotation provides the most simple configuration, it will consume messages one by one. This limitation comes directly from spring's QueueMessagingTemplate.

To consume batches you could use AmazonSQS client directly.

    @Autowire AmazonSQSAsync amazonSqs;
    ...

    String queueUrl = amazonSqs.getQueueUrl("queueName").getQueueUrl();
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
    receiveMessageRequest.setQueueUrl(queueUrl);
    receiveMessageRequest.setWaitTimeSeconds(10); // Listener for messages in the next 10 seconds
    receiveMessageRequest.setMaxNumberOfMessages(1000); // If 10000 messages are read stop listening
    ReceiveMessageResult receiveMessageResult = amazonSqs.receiveMessage(receiveMessageRequest);
    receiveMessageResult.getMessages(); // batch of messages

Upvotes: 2

Related Questions