Reputation: 496
I'm implementing my own webhooks service which will send out events to subscribed webhooks.
Overview of architecture:
As far as I can tell, the call to change the message visibility is succeeding. I'm wondering if there's something else baked into lambdas that are invoked by SQS. Upon failure from the lambda, is it internally changing the message visibility again? Or do lambdas that are invoked by SQS not respect message visibility changes (this really doesn't make any sense to me). Curious if anyone has any insight into this problem. I was quite surprised to find out that lambda automatically deletes messages upon success since it makes my particular use-case a little clunky feeling - throwing an error to fail the lambda function to prevent the message from being deleted.
Thanks in advance!
Upvotes: 3
Views: 3576
Reputation: 1
Since SQS triggers for lambdas process messages in batches, using await in a standard forEach doesn't work (forEach does not await the callbacks if they are async). To bypass that, you can create your own version of an async forEach:
var AWS = require('aws-sdk');
exports.handler = async function(event, context) {
var sqs = new AWS.SQS({apiVersion: '2012-11-05'});
await asyncForEach(event.Records, async record => {
const { receiptHandle } = record;
const sqsParams = {
QueueUrl: '<YOUR_QUEUE_URL>', /* required */
ReceiptHandle: receiptHandle, /* required */
VisibilityTimeout: '<in seconds>' /* required */
};
try {
let res = await sqs.changeMessageVisibility(sqsParams).promise();
console.log(res);
} catch (err) {
console.log(err, err.stack);
throw new Error('Fail.');
}
}
});
return {};
};
async function asyncForEach(array, callback) {
for (let index = 0; index < array.length; index++) {
await callback(array[index], index, array);
}
}
Upvotes: 0
Reputation: 496
Update: This is actually not the case. I was failing to properly wait for the call to adjust the timeout to finish. As such the lambda was closing before that request was completing. The timeout I'm setting on the message within the lambda is being respected. I'm then throwing an error to prevent the message from being deleted.
Upvotes: 2
Reputation: 8887
The nature of the SQS integration with Lambda is that the integration controls the polling of the messages. The mechanism used to determine if the message should be deleted is that the response from the lambda is not an error. It's not clearly stated in the documentation, but I believe when an error occurs the integration is going to set the visibility timeout to zero in order to make it immediately available for another process to pick it up. So in your example you are setting it to some number that allows you to retry, but when you return an error the integration is setting the timeout back to zero. If you need to have more control over that process you probably should not use the integration.
Upvotes: 2