G. Dan
G. Dan

Reputation: 93

Need help on CloudFormation template and AWS lambda for pulling events from SQS to S3 via lambda

I am new to AWS CloudFormation, and I am trying to capture events from SQS Queue and place them in S3 bucket via AWS lambda. Flow of events is SNS --> SQS <-- Lambda ---> S3 bucket.

I am trying to achieve above flow using cloudFormation template.I am getting below error message after deploying my CloudFormation template. Any help you can provide would be greatly appreciated. Thank you

MY lambda function is :

import json
import logging

import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')


def lambda_handler(event, context):
    logger.info(f"lambda_handler -- event: {json.dumps(event)}")

    s3_bucket = boto3.resource("3")

    event_message = json.loads(event["Records"][0]["body"])
    s3_bucket.put_object(Bucket="S3DeployBucket", key="data.json",  Body=json.dumps(event_message))

My complete CloudFormation template is :

{
  "AWSTemplateFormatVersion": "2010-09-09",
  "Description": "myDemoApp Resource Stack",
  "Mappings": {

  },
  "Parameters": {
    "S3DeployBucket": {
      "Default": "myDemoApp-deploy-bucket",
      "Description": "Bucket for deployment configs and artifacts for myDemoApp",
      "Type": "String"
    },
    "EnvName": {
      "Description": "Platform environment name for myDemoApp",
      "Type": "String"
    },
    "AuditRecordKeyArn": {
      "Description": "ARN for audit record key encryption for myDemoApp",
      "Type": "String"
    },
    "ParentVPCStack": {
      "Description": "The name of the stack containing the parent VPC for myDemoApp",
      "Type": "String"
    },
    "StackVersion": {
      "Description": "The version of this stack of myDemoApp",
      "Type": "String"
    },
    "EventLogFolderName": {
      "Type": "String",
      "Description": "folder name for the logs for the event stream of myDemoApp",
      "Default": "event_log_stream"
    },
    "EventLogPartitionKeys": {
      "Type": "String",
      "Description": "The partition keys that audit logs will write to S3. Use Hive-style naming conventions for automatic Athena/Glue comprehension.",
      "Default": "year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}"
    },
    "AppEventSNSTopicArn": {
      "Description": "Events SNS Topic of myDemoApp",
      "Type": "String"
    },
    "ReportingEventsRetentionDays": {
      "Default": "2192",
      "Description": "The number of days to retain a record used for reporting.",
      "Type": "String"
    }
  },
  "Resources": {
    "AppEventSQSQueue": {
      "Type": "AWS::SQS::Queue"
    },

    "AppEventSnsSubscription": {
      "Type": "AWS::SNS::Subscription",
      "Properties": {
        "TopicArn": {
          "Ref": "AppEventSNSTopicArn"
        },
        "Endpoint": {
          "Fn::GetAtt": [
            "AppEventSQSQueue",
            "Arn"
          ]
        },
        "Protocol": "sqs"
      }
    },

    "S3DeployBucket": {
      "Type": "AWS::S3::Bucket",
      "DeletionPolicy": "Retain",
      "UpdateReplacePolicy": "Retain",
      "Properties": {
        "BucketEncryption": {
          "ServerSideEncryptionConfiguration": [
            {
              "ServerSideEncryptionByDefault": {
                "KMSMasterKeyID": {
                  "Ref": "AuditRecordKeyArn"
                },
                "SSEAlgorithm": "aws:kms"
              }
            }
          ]
        },
        "VersioningConfiguration": {
          "Status": "Enabled"
        },
        "LifecycleConfiguration": {
          "Rules": [
            {
              "ExpirationInDays": {
                "Ref": "ReportingEventsRetentionDays"
              },
              "Status": "Enabled"
            }
          ]
        }
      }
    },
    "EventStreamLogGroup": {
      "Type": "AWS::Logs::LogGroup"
    },
    "EventLogStream": {
      "Type": "AWS::Logs::LogStream",
      "Properties": {
        "LogGroupName": {
          "Ref": "EventStreamLogGroup"
        }
      }
    },
    "EventStreamSubscriptionRole": {
      "Type": "AWS::IAM::Role",
      "Properties": {
        "AssumeRolePolicyDocument": {
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Principal": {
                "Service": "sns.amazonaws.com"
              },
              "Action": "sts:AssumeRole"
            }
          ]
        },
        "Policies": [
          {
            "PolicyName": "SNSSQSAccessPolicy",
            "PolicyDocument": {
              "Version": "2012-10-17",
              "Statement": {
                "Action": [
                  "sqs:*"
                ],
                "Effect": "Allow",
                "Resource": "*"
              }
            }
          }
        ]
      }
    },
    "EventDeliveryRole": {
      "Type": "AWS::IAM::Role",
      "Properties": {
        "AssumeRolePolicyDocument": {
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Principal": {
                "Service": "sqs.amazonaws.com"
              },
              "Action": "sts:AssumeRole",
              "Condition": {
                "StringEquals": {
                  "sts:ExternalId": {
                    "Ref": "AWS::AccountId"
                  }
                }
              }
            }
          ]
        }
      }
    },
    "EventSqsQueuePolicy": {
      "Type": "AWS::SQS::QueuePolicy",
      "Properties": {
        "PolicyDocument": {
          "Version": "2012-10-17",
          "Id": "SqsQueuePolicy",
          "Statement": [
            {
              "Sid": "Allow-SNS-SendMessage",
              "Effect": "Allow",
              "Principal": "*",
              "Action": [
                "sqs:SendMessage",
                "sqs:ReceiveMessage"
              ],
              "Resource": {
                "Fn::GetAtt": [
                  "EventStreamLambda",
                  "Arn"
                ]
              },
              "Condition": {
                "ArnEquals": {
                  "aws:SourceArn": {
                    "Ref": "EventSNSTopicArn"
                  }
                }
              }
            }
          ]
        },
        "Queues": [
          {
            "Ref": "EventSNSTopicArn"
          }
        ]
      }
    },
    "EventDeliveryPolicy": {
      "Type": "AWS::IAM::Policy",
      "Properties": {
        "PolicyName": "sqs_delivery_policy",
        "PolicyDocument": {
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Action": [
                "s3:PutObject"
              ],
              "Resource": [
                {
                  "Fn::GetAtt": [
                    "S3DeployBucket",
                    "Arn"
                  ]
                },
                {
                  "Fn::Join": [
                    "",
                    [
                      {
                        "Fn::GetAtt": [
                          "S3DeployBucket",
                          "Arn"
                        ]
                      },
                      "/*"
                    ]
                  ]
                }
              ]
            },
            {
              "Effect": "Allow",
              "Action": [
                "logs:PutLogEvents"
              ],
              "Resource": {
                "Fn::Sub": "arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:${EventStreamLogGroup}:log-stream:${EventLogStreamLogStream}"
              }
            },
            {
              "Effect": "Allow",
              "Action": [
                "kms:Decrypt",
                "kms:GenerateDataKey"
              ],
              "Resource": [
                {
                  "Ref": "AuditRecordKeyArn"
                }
              ],
              "Condition": {
                "StringEquals": {
                  "kms:ViaService": {
                    "Fn::Join": [
                      "",
                      [
                        "s3.",
                        {
                          "Ref": "AWS::Region"
                        },
                        ".amazonaws.com"
                      ]
                    ]
                  }
                },
                "StringLike": {
                  "kms:EncryptionContext:aws:s3:arn": {
                    "Fn::Join": [
                      "",
                      [
                        {
                          "Fn::GetAtt": [
                            "S3DeployBucket",
                            "Arn"
                          ]
                        },
                        "/*"
                      ]
                    ]
                  }
                }
              }
            }
          ]
        },
        "Roles": [
          {
            "Ref": "EventDeliveryRole"
          }
        ]
      }
    },
    "EventStreamLambda": {
      "Type": "AWS::Lambda::Function",
      "Properties": {
        "Handler": "lambda_function.lambda_handler",
        "MemorySize": 128,
        "Runtime": "python3.8",
        "Timeout": 30,
        "FunctionName": "sqs_s3_pipeline_job",
        "Role": {
          "Fn::GetAtt": [
            "SQSLambdaExecutionRole",
            "Arn"
          ]
        },
        "Code": {
          "S3Bucket": {
            "Ref": "S3DeployBucket"
          },
          "S3Key": {
            "Ref": "S3DeployBucket"
          }
        },
        "TracingConfig": {
          "Mode": "Active"
        }
      }
    },
    "SQSLambdaExecutionRole": {
      "Type": "AWS::IAM::Role",
      "Properties": {
        "AssumeRolePolicyDocument": {
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Principal": {
                "Service": [
                  "lambda.amazonaws.com"
                ]
              },
              "Action": [
                "sts:AssumeRole"
              ]
            }
          ]
        },
        "Policies": [
          {
            "PolicyName": "StreamLambdaLogs",
            "PolicyDocument": {
              "Version": "2012-10-17",
              "Statement": [
                {
                  "Effect": "Allow",
                  "Action": [
                    "logs:*"
                  ],
                  "Resource": "arn:aws:logs:*:*:*"
                }
              ]
            }
          },
          {
            "PolicyName": "SQSLambdaPolicy",
            "PolicyDocument": {
              "Version": "2012-10-17",
              "Statement": [
                {
                  "Effect": "Allow",
                  "Action": [
                    "sqs:ReceiveMessage",
                    "sqs:DeleteMessage",
                    "sqs:GetQueueAttributes",
                    "sqs:ChangeMessageVisibility"
                  ],
                  "Resource":"*"
                }
              ]
            }
          }
        ]
      }
    }
  },
  "Outputs": {
    "VpcSubnet3ExportKey": {
      "Value": {
        "Fn::Sub": "${ParentVPCStack}-privateSubnet3"
      }
    }
  }
}

Upvotes: 1

Views: 426

Answers (1)

Marcin
Marcin

Reputation: 238687

SubscriptionRoleArn is only for kinesis:

This property applies only to Amazon Kinesis Data Firehose delivery stream subscriptions.

Upvotes: 0

Related Questions