KKJ
KKJ

Reputation: 13

How to manage message visibility timeout effectively?

We have a process that takes from 30 seconds to 5 minutes depending on the event. Our base architecture looks as follows (python 3.9, boto3 to send_message to sqs):

The VPC and NAT gateway are configured.

We spotted that the subsequent messages are processed always in 10 minutes intervals although functions invocations are completed successufully earlier. We intend to manage time more efficiently and tried to shorten message visibility timeout to 1 minute by default and extend it by 1 minute every 30 seconds (we use boto3 client.change_message_visibility) during the function B execution. It seems it does not work - almost all the messages end up in our dead letter queue (the CloudWatch logs do not suggest any error).

Our codes:

  1. The basic code (before our attempt to optimize time of the whole process; at least it worked is such a way that all the messages were processed successfully; however it took too much time because messages are processed in 10 minutes intervals even though some were processed in e.g. 2 minutes): Template.yaml exerpts concerning SQS and lambda function B:
Globals:
  Function:
    Timeout: 30
    Environment:
      Variables:
        SQS_QUEUE_URL: !Ref SqsQueue
Resources:
  SqsQueue:
    Type: AWS::SQS::Queue
    Properties:
      VisibilityTimeout: 600
      RedrivePolicy:
        deadLetterTargetArn:
          Fn::GetAtt:
            - DeadLetterQueue
            - Arn
        maxReceiveCount: 10
  DeadLetterQueue:
    Type: AWS::SQS::Queue
  LambdaFunctionBRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Policies:
        - PolicyName: receiveAndDeleteFromQueue
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action: [
                  sqs:ReceiveMessage,
                  sqs:ChangeMessageVisibility,
                  sqs:DeleteMessage,
                  sqs:GetQueueAttributes
                ]
                Resource: !GetAtt SqsQueue.Arn
        - PolicyName: accessVpc
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:  [
                  logs:CreateLogGroup,
                  logs:CreateLogStream,
                  logs:PutLogEvents,
                  ec2:CreateNetworkInterface,
                  ec2:DescribeNetworkInterfaces,
                  ec2:DeleteNetworkInterface,
                  ec2:AssignPrivateIpAddresses,
                  ec2:UnassignPrivateIpAddresses
                ]
                Resource: "*"
  LambdaFunctionB:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/function_b/
      Handler: app.lambda_handler
      Runtime: python3.9
      Role: !GetAtt LambdaFunctionBExecutionRole.Arn 
      Architectures:
        - x86_64
      VpcConfig:
        SecurityGroupIds:
          - "xxx"
        SubnetIds:
          - "xxx"
          - "xxx"
          - "xxx"
      FileSystemConfigs:
        - Arn: !GetAtt AccessPoint.Arn
          LocalMountPath: /mnt/lambda
      ReservedConcurrentExecutions: 1
      Events:
        SqsEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt SqsQueue.Arn
            BatchSize: 1
            Enabled: True
      Timeout: 600
      MemorySize: 512 

Lambda function B code app.py:

import os
import boto3
<some other imports>

sqs_queue_url = os.environ.get("SQS_QUEUE_URL")
sqs_client = boto3.client("sqs")

def lambda_handler(event, context):
    status = "ok"

    record = event["Records"][0]
    message = json.loads(record["body"])
    receipt_handle = record["receiptHandle"]
    
    <some business logic here>
    
    return status
  1. After we decided to try to optimize time efficiency we changed VisibilityTimeout:60 for SqsQueue and changed the app.py file for lambda function B as follows:
import os
import boto3
from threading import Timer
from time import sleep
<some other imports>

sqs_queue_url = os.environ.get("SQS_QUEUE_URL")
sqs_client = boto3.client("sqs")

class RepeatingTimer(Timer):
    def run(self):
        while not self.finished.is_set():
            self.function(*self.args, **self.kwargs)
            self.finished.wait(self.interval)
def increase_visibility_timeout(sqs_client, sqs_queue_url, receipt_handle, timeout):
        sqs_client.change_message_visibility(
            QueueUrl=sqs_queue_url,
            ReceiptHandle=receipt_handle,
            VisibilityTimeout=timeout
        )

def lambda_handler(event, context):
    status = "ok"

    record = event["Records"][0]
    message = json.loads(record["body"])
    receipt_handle = record["receiptHandle"]
    
    t = RepeatingTimer(30, increase_visibility_timeout, [sqs_client, sqs_queue_url, receipt_handle, 60])
    t.start() 

    <some business logic here>
    
    increase_visibility_timeout(sqs_client, sqs_queue_url, receipt_handle, 5)
    t.cancel()

    return status

Upvotes: 1

Views: 4141

Answers (1)

John Rotenstein
John Rotenstein

Reputation: 270184

The normal process for handling messages from an Amazon SQS queue is:

  • A process sends messages to the SQS Queue
  • Since you have an AWS Lambda function with a trigger on the SQS queue, and it is configured with a batch size of 1, only one message will be passed to the Lambda function
  • Additionally, since the Lambda function is configured with a Reserved Concurrency of 1, there will be no parallel processing -- all messages will be handled by the one instance of the Lambda function
  • When the message is passed to the Lambda function, the invisibility timeout begins
  • When the Lambda function exits successfully, the Lambda service will delete the message from the SQS queue
  • However, if the invisibility timeout period is passed before the Lambda function finishes processing the message, then the message will reappear on the queue (the expectation being that the message failed to process). This will cause the message to be processed again in future.
  • It is also possible to send Amazon SQS a heartbeat to tell it that the message is still being processed and extend the invisibility timeout

In general, the invisibility timeout should be set high enough to ensure that it only triggers when something has gone wrong with the processing. For example, if it normally takes 1 minute to process a message, you might set the invisibility period to 4 minutes. This allows for unusual situation where processing might take longer, without accidentally reprocessing the message when it was actually successfully processed.

Your situation

You mention that "lambda waits until SQS queue timeout passes to start to process the next message". This suggests that something strange is happening with your process. Normally, when the Lambda function exits without an error, the Lambda service would invoke the Lambda function again with the next message. There is no direct link between message invisibility periods and when a Lambda function is executed -- that is, given your configuration, there is nothing that would 'kill' an existing Lambda function when the message timeout is reached, nor is there anything that would delay the next Lambda invocation until a message timeout has occurred. The function, however, will be 'killed' when the Lambda function timeout has been reached. Therefore, it sounds like your Lambda function is timing-out before it has finished processing the message. You should increase the Lambda function's timeout to be several times bigger than the expected run duration. (This is a separate configuration from the SQS queue invisibility period.)

Upvotes: 5

Related Questions