Reputation: 13
We have a process that takes from 30 seconds to 5 minutes depending on the event. Our base architecture looks as follows (python 3.9, boto3 to send_message to sqs):
The VPC and NAT gateway are configured.
We spotted that the subsequent messages are processed always in 10 minutes intervals although functions invocations are completed successufully earlier. We intend to manage time more efficiently and tried to shorten message visibility timeout to 1 minute by default and extend it by 1 minute every 30 seconds (we use boto3 client.change_message_visibility) during the function B execution. It seems it does not work - almost all the messages end up in our dead letter queue (the CloudWatch logs do not suggest any error).
Our codes:
Globals:
Function:
Timeout: 30
Environment:
Variables:
SQS_QUEUE_URL: !Ref SqsQueue
Resources:
SqsQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 600
RedrivePolicy:
deadLetterTargetArn:
Fn::GetAtt:
- DeadLetterQueue
- Arn
maxReceiveCount: 10
DeadLetterQueue:
Type: AWS::SQS::Queue
LambdaFunctionBRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: receiveAndDeleteFromQueue
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: [
sqs:ReceiveMessage,
sqs:ChangeMessageVisibility,
sqs:DeleteMessage,
sqs:GetQueueAttributes
]
Resource: !GetAtt SqsQueue.Arn
- PolicyName: accessVpc
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: [
logs:CreateLogGroup,
logs:CreateLogStream,
logs:PutLogEvents,
ec2:CreateNetworkInterface,
ec2:DescribeNetworkInterfaces,
ec2:DeleteNetworkInterface,
ec2:AssignPrivateIpAddresses,
ec2:UnassignPrivateIpAddresses
]
Resource: "*"
LambdaFunctionB:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/function_b/
Handler: app.lambda_handler
Runtime: python3.9
Role: !GetAtt LambdaFunctionBExecutionRole.Arn
Architectures:
- x86_64
VpcConfig:
SecurityGroupIds:
- "xxx"
SubnetIds:
- "xxx"
- "xxx"
- "xxx"
FileSystemConfigs:
- Arn: !GetAtt AccessPoint.Arn
LocalMountPath: /mnt/lambda
ReservedConcurrentExecutions: 1
Events:
SqsEvent:
Type: SQS
Properties:
Queue: !GetAtt SqsQueue.Arn
BatchSize: 1
Enabled: True
Timeout: 600
MemorySize: 512
Lambda function B code app.py:
import os
import boto3
<some other imports>
sqs_queue_url = os.environ.get("SQS_QUEUE_URL")
sqs_client = boto3.client("sqs")
def lambda_handler(event, context):
status = "ok"
record = event["Records"][0]
message = json.loads(record["body"])
receipt_handle = record["receiptHandle"]
<some business logic here>
return status
import os
import boto3
from threading import Timer
from time import sleep
<some other imports>
sqs_queue_url = os.environ.get("SQS_QUEUE_URL")
sqs_client = boto3.client("sqs")
class RepeatingTimer(Timer):
def run(self):
while not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.wait(self.interval)
def increase_visibility_timeout(sqs_client, sqs_queue_url, receipt_handle, timeout):
sqs_client.change_message_visibility(
QueueUrl=sqs_queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=timeout
)
def lambda_handler(event, context):
status = "ok"
record = event["Records"][0]
message = json.loads(record["body"])
receipt_handle = record["receiptHandle"]
t = RepeatingTimer(30, increase_visibility_timeout, [sqs_client, sqs_queue_url, receipt_handle, 60])
t.start()
<some business logic here>
increase_visibility_timeout(sqs_client, sqs_queue_url, receipt_handle, 5)
t.cancel()
return status
Upvotes: 1
Views: 4141
Reputation: 270184
The normal process for handling messages from an Amazon SQS queue is:
In general, the invisibility timeout should be set high enough to ensure that it only triggers when something has gone wrong with the processing. For example, if it normally takes 1 minute to process a message, you might set the invisibility period to 4 minutes. This allows for unusual situation where processing might take longer, without accidentally reprocessing the message when it was actually successfully processed.
Your situation
You mention that "lambda waits until SQS queue timeout passes to start to process the next message". This suggests that something strange is happening with your process. Normally, when the Lambda function exits without an error, the Lambda service would invoke the Lambda function again with the next message. There is no direct link between message invisibility periods and when a Lambda function is executed -- that is, given your configuration, there is nothing that would 'kill' an existing Lambda function when the message timeout is reached, nor is there anything that would delay the next Lambda invocation until a message timeout has occurred. The function, however, will be 'killed' when the Lambda function timeout has been reached. Therefore, it sounds like your Lambda function is timing-out before it has finished processing the message. You should increase the Lambda function's timeout to be several times bigger than the expected run duration. (This is a separate configuration from the SQS queue invisibility period.)
Upvotes: 5