Reputation: 1137
I have a simple class named QueueService
with some methods that wrap the methods from the AWS SQS SDK for Java. For example:
public ArrayList<Hashtable<String, String>> receiveMessages(String queueURL) {
List<Message> messages = this.sqsClient.receiveMessage(queueURL).getMessages();
ArrayList<Hashtable<String, String>> resultList = new ArrayList<Hashtable<String, String>>();
for(Message message : messages) {
Hashtable<String, String> resultItem = new Hashtable<String, String>();
resultItem.put("MessageId", message.getMessageId());
resultItem.put("ReceiptHandle", message.getReceiptHandle());
resultItem.put("Body", message.getBody());
resultList.add(resultItem);
}
return resultList;
}
I have another another class named App
that has a main
and creates an instace of the QueueService
.
I looking for a "pattern" to make the main
in App
to listen for new messages in the queue. Right now I have a while(true)
loop where I call the receiveMessages
method:
while(true) {
messages = queueService.receiveMessages(queueURL);
for(Hashtable<String, String> message: messages) {
String receiptHandle = message.get("ReceiptHandle");
String messageBody = message.get("MessageBody");
System.out.println(messageBody);
queueService.deleteMessage(queueURL, receiptHandle);
}
}
Is this the correct way? Should I use the async message receive method in SQS SDK?
Upvotes: 30
Views: 47894
Reputation: 369
It is several years later, but just in case someone searches this topic, i'm posting my solution. I'm not sure if this is the best one but works for us. Note that this uses Project Reactor. Also note that this solution is used for not very time-critical messages - delays of several minutes are ok for us here.
package myapp.amazonsqs;
import static org.apache.logging.log4j.LogManager.getLogger;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import org.apache.logging.log4j.Logger;
import reactor.core.publisher.Flux;
import reactor.util.retry.Retry;
import java.time.Duration;
import java.util.Map;
import java.util.stream.Collectors;
public class MyAmazonSqsMessagingGateway {
private static final Logger LOGGER = getLogger(MyAmazonSqsMessagingGateway.class);
private static final long POLLING_PERIOD_SECONDS = 30L;
// max 20
private static final int POLL_WAIT_TIME_SECONDS = 20;
private static final long MINIMUM_RETRY_PERIOD_SECONDS = 30L;
private final String amazonAwsRegion;
private final String amazonAwsAccessKeyId;
private final String amazonAwsAccessKeySecret;
private final String queueName;
private AmazonSQS amazonSqsClient;
public MyAmazonSqsMessagingGateway(
final String amazonAwsRegion,
final String amazonAwsAccessKeyId,
final String amazonAwsAccessKeySecret,
final String queueName
) {
this.amazonAwsRegion = amazonAwsRegion;
this.amazonAwsAccessKeyId = amazonAwsAccessKeyId;
this.amazonAwsAccessKeySecret = amazonAwsAccessKeySecret;
this.queueName = queueName;
}
public void init() {
this.amazonSqsClient = createClient();
start();
}
private AmazonSQS createClient() {
return AmazonSQSClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
this.amazonAwsAccessKeyId,
this.amazonAwsAccessKeySecret
)))
.withRegion(Regions.fromName(this.amazonAwsRegion))
.build();
}
private void start() {
LOGGER.debug("Starting..");
final String queueUrl = getAndCheckMessageQueueUrl();
final Duration initialDelay = Duration.ofSeconds(1L);
final Duration pollingPeriod = Duration.ofSeconds(POLLING_PERIOD_SECONDS);
final Duration minimumRetryPeriod = Duration.ofSeconds(MINIMUM_RETRY_PERIOD_SECONDS);
// retry indefinitely with backoff, until this application is stopped
final long maxNumberOfRetryAttempts = Long.MAX_VALUE;
Flux.interval(initialDelay, pollingPeriod)
.map(ignoredParameter -> receiveMessages(this.amazonSqsClient, queueUrl))
.retryWhen(Retry
.backoff(maxNumberOfRetryAttempts, minimumRetryPeriod)
.doBeforeRetry(retrySignal -> LOGGER.warn(
"Exception when receiving messages, retrying.. ",
retrySignal.failure()
))
.doAfterRetry(retrySignal -> LOGGER.debug("Retry complete."))
)
.subscribe(
receiveMessageResult -> receiveMessageResult
.getMessages()
.forEach(this::handleMessage),
throwable -> LOGGER.error(
"Non-recoverable exception when receiving messages from Amazon SQS: ",
throwable
)
);
LOGGER.debug("Start completed.");
}
private ReceiveMessageResult receiveMessages(final AmazonSQS amazonSqsClient, final String queueUrl) {
LOGGER.debug("Receiving messages...");
return amazonSqsClient.receiveMessage(new ReceiveMessageRequest(
queueUrl).withWaitTimeSeconds(POLL_WAIT_TIME_SECONDS)
.withMaxNumberOfMessages(10));
}
private String getAndCheckMessageQueueUrl() {
final String queueUrl = amazonSqsClient
.getQueueUrl(this.queueName)
.getQueueUrl();
if (queueUrl == null) {
throw new IllegalStateException("queueUrl is null, cannot run!");
} else {
LOGGER.info(() -> String.format("Listening in queue %s", queueUrl));
}
return queueUrl;
}
private void handleMessage(final Message message) {
logMessage(message);
// do something useful with the message here.
}
private static void logMessage(final Message message) {
if (LOGGER.isDebugEnabled()) {
final Map<String, String> attributes = message.getAttributes();
final String attributesAsSingleString = attributes
.keySet()
.stream()
.map(key -> "Attribute " + key + " value = " + attributes.get(key))
.collect(Collectors.joining("\n"));
LOGGER.debug("Message received! id = "
+ message.getMessageId()
+ "\nreceipt handle = "
+ message.getReceiptHandle()
+ "\n"
+ attributesAsSingleString
+ "body:\n"
+ message.getBody());
}
}
}
Upvotes: 0
Reputation: 31
I found one solution for actively listening the queue. For Node. I have used the following package and resolved my issue.
sqs-consumer
Link https://www.npmjs.com/package/sqs-consumer
Upvotes: 0
Reputation: 1651
As of 2019 SQS can trigger lambdas: https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
Upvotes: 0
Reputation: 428
If you want to use SQS and then lambda to process the request you can follow the steps given in the link or you always use lambda instead of SQS and invoke lambda for every request.
Upvotes: 0
Reputation: 471
To my knowledge, there is no way in Amazon SQS to support an active listener model where Amazon SQS would "push" messages to your listener, or would invoke your message listener when there are messages.
So, you would always have to poll for messages. There are two polling mechanisms supported for polling - Short Polling and Long Polling. Each has its own pros and cons, but Long Polling is the one you would typically end up using in most cases, although the default one is Short Polling. Long Polling mechanism is definitely more efficient in terms of network traffic, is more cost efficient (because Amazon charges you by the number of requests made), and is also the preferred mechanism when you want your messages to be processed in a time sensitive manner (~= process as soon as possible).
There are more intricacies around Long Polling and Short Polling that are worth knowing, and its somewhat difficult to paraphrase all of that here, but if you like, you can read a lot more details about this through the following blog. It has a few code examples as well that should be helpful.
http://pragmaticnotes.com/2017/11/20/amazon-sqs-long-polling-versus-short-polling/
In terms of a while(true) loop, I would say it depends. If you are using Long Polling, and you can set the wait time to be (max) 20 seconds, that way you do not poll SQS more often than 20 seconds if there are no messages. If there are messages, you can decide whether to poll frequently (to process messages as soon as they arrive) or whether to always process them in time intervals (say every n seconds).
Another point to note would be that you could read upto 10 messages in a single receiveMessages request, so that would also reduce the number of calls you make to SQS, thereby reducing costs. And as the above blog explains in details, you may request to read 10 messages, but it may not return you 10 even if there are that many messages in the queue.
In general though, I would say you need to build appropriate hooks and exception handling to turn off the polling if you wish to at runtime, in case you are using a while(true) kind of a structure.
Another aspect to consider is whether you would like to poll SQS in your main application thread or you would like to spawn another thread. So another option could be to create a ScheduledThreadPoolExecutor with a single thread in the main to schedule a thread to poll the SQS periodically (every few seconds), and you may not need a while(true) structure.
Upvotes: 36
Reputation: 21
There are a few things that you're missing:
receiveMessages(ReceiveMessageRequest)
and set a wait time to enable long polling.OverLimitException
, which can be thrown from receiveMessages()
if you would have too many in-flight messages.while
loop in its own try/catch block, logging any exceptions that are caught (there shouldn't be -- this is here to ensure that your application doesn't crash because AWS changed their API or you neglected to handle an expected exception).See doc for more information about long polling and possible exceptions.
As for using the async client: do you have any particular reason to use it? If not, then don't: a single receiver thread is much easier to manage.
Upvotes: 2