Sami
Sami

Reputation: 11

Python lambda function returns ErrorType KeyError

i am creating a lambda function for Certification expiration Alert, and getting this error message when Test run

Response

{
  "errorMessage": "'detail-type'",
  "errorType": "KeyError",
  "requestId": "5449b430-e32a-4645-93b0-f204e92ef6e6",
  "stackTrace": [
    "  File \"/var/task/lambda_function.py\", line 25, in lambda_handler\n    if (event ['detail-type'] == \"ACM Certificate Approaching Expiration\"):\n"
  ]
}

Function Logs

START RequestId: 5449b430-e32a-4645-93b0-f204e92ef6e6 Version: $LATEST
[ERROR] KeyError: 'detail-type'
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 25, in lambda_handler
    if (event ['detail-type'] == "ACM Certificate Approaching Expiration"):END RequestId: 5449b430-e32a-4645-93b0-f204e92ef6e6
REPORT RequestId: 5449b430-e32a-4645-93b0-f204e92ef6e6  Duration: 1.54 ms   Billed Duration: 2 ms   Memory Size: 128 MB Max Memory Used: 53 MB  Init Duration: 233.48 ms

Request ID 5449b430-e32a-4645-93b0-f204e92ef6e6

here it is the full lambda funtion

import json
import boto3
import os
from datetime import datetime, timedelta, timezone

# -------------------------------------------
# setup global data
# -------------------------------------------
utc = timezone.utc

# make today timezone aware
today = datetime.now().replace(tzinfo=utc)

# set up time window for alert - default to 45 if its missing
if os.environ.get('EXPIRY_DAYS') is None:
    expiry_days = 45
else:
    expiry_days = int(os.environ['EXPIRY_DAYS'])

expiry_window = today + timedelta(days = expiry_days)

def lambda_handler(event, context):

    # if this is coming from the ACM event, its for a single certificate
    if (event ['detail-type'] == "ACM Certificate Approaching Expiration"):
        
        response = handle_single_cert (event, context.invoked_function_arn)

    # otherwise, we need to get all the expiring certs that are expiring from CloudWatch Metrics
    else:
        response = handle_multiple_certs(event, context.invoked_function_arn)
    
    return {
        'statusCode': 200,
        'body': response 
    }


def handle_single_cert(event, context_arn):
    cert_client = boto3.client('acm')

    cert_details = cert_client.describe_certificate(CertificateArn=event['resources'][0])

    result = 'The following certificate is expiring within ' + str(expiry_days) + ' days: ' + cert_details['Certificate']['DomainName']
    
    # check the expiry window before logging to Security Hub and sending an SNS
    if cert_details['Certificate']['NotAfter'] < expiry_window:
        # This call is the text going into the SNS notification
        result = result + ' (' + cert_details['Certificate']['CertificateArn'] + ') '

        # this call is publishing to SH
        result = result + ' - ' + log_finding_to_sh(event, cert_details, context_arn)
        
        # if there's an SNS topic, publish a notification to it
        if os.environ.get('SNS_TOPIC_ARN') is None:
            response = result
        else:
            sns_client = boto3.client('sns')
            response = sns_client.publish(TopicArn=os.environ['SNS_TOPIC_ARN'], Message=result, Subject='Certificate Expiration Notification')
        
    return result

def handle_multiple_certs(event, context_arn):
    cert_client = boto3.client('acm')

    cert_list = json.loads(get_expiring_cert_arns())
    
    if cert_list is None:
        response = 'No certificates are expiring within ' + str(expiry_days) + ' days.'

    else:
        response = 'The following certificates are expiring within ' + str(expiry_days) + ' days: \n'

        # loop through the cert list and pull out certs that are expiring within the expiry window
        for csl in cert_list:
            cert_arn = json.dumps(csl['Dimensions'][0]['Value']).replace('\"', '')
            cert_details = cert_client.describe_certificate(CertificateArn=cert_arn)

            if cert_details['Certificate']['NotAfter'] < expiry_window:
                current_cert = 'Domain:' + cert_details['Certificate']['DomainName'] + ' (' + cert_details['Certificate']['CertificateArn'] + '), \n'
                print(current_cert)

                # this is publishing to SH
                result = log_finding_to_sh(event, cert_details, context_arn)

                # This is the text going into the SNS notification
                response = response + current_cert
                
    # if there's an SNS topic, publish a notification to it
    if os.environ.get('SNS_TOPIC_ARN') is not None:
        sns_client = boto3.client('sns')
        response = sns_client.publish(TopicArn=os.environ['SNS_TOPIC_ARN'], Message=response.rstrip(', \n'), Subject='Certificate Expiration Notification')

    return response

def log_finding_to_sh(event, cert_details, context_arn):
    # setup for security hub
    sh_region = get_sh_region(event['region'])
    sh_hub_arn = "arn:aws:securityhub:{0}:{1}:hub/default".format(sh_region, event['account'])
    sh_product_arn = "arn:aws:securityhub:{0}:{1}:product/{1}/default".format(sh_region, event['account'])

    # check if security hub is enabled, and if the hub arn exists
    sh_client = boto3.client('securityhub', region_name = sh_region)
    try:
        sh_enabled = sh_client.describe_hub(HubArn = sh_hub_arn)

    # the previous command throws an error indicating the hub doesn't exist or lambda doesn't have rights to it so we'll stop attempting to use it
    except Exception as error:
        sh_enabled = None
        print ('Default Security Hub product doesn\'t exist')
        response = 'Security Hub disabled'
    
    # This is used to generate the URL to the cert in the Security Hub Findings to link directly to it
    cert_id = right(cert_details['Certificate']['CertificateArn'], 36)

    if sh_enabled:
        # set up a new findings list
        new_findings = []
    
            # add expiring certificate to the new findings list
        new_findings.append({
            "SchemaVersion": "2018-10-08",
            "Id": cert_id,
            "ProductArn": sh_product_arn,
            "GeneratorId": context_arn,
            "AwsAccountId": event['account'],
            "Types": [
                "Software and Configuration Checks/AWS Config Analysis"
            ],
            "CreatedAt": event['time'],
            "UpdatedAt": event['time'],
            "Severity": {
                "Original": '89.0',
                "Label": 'HIGH'
            },
            "Title": 'Certificate expiration',
            "Description": 'cert expiry',
            'Remediation': {
                'Recommendation': {
                    'Text': 'A new certificate for ' + cert_details['Certificate']['DomainName'] + ' should be imported to replace the existing imported certificate before expiration',
                    'Url': "https://console.aws.amazon.com/acm/home?region=" + event['region'] + "#/?id=" + cert_id
                }
            },
            'Resources': [
                {
                    'Id': event['id'],
                    'Type': 'ACM Certificate',
                    'Partition': 'aws',
                    'Region': event['region']
                }
            ],
            'Compliance': {'Status': 'WARNING'}
        })
    
        # push any new findings to security hub
        if new_findings:
            try:
                response = sh_client.batch_import_findings(Findings=new_findings)
    
                if response['FailedCount'] > 0:
                    print("Failed to import {} findings".format(response['FailedCount']))
    
            except Exception as error:
                print("Error: ", error)
                raise
            
    return json.dumps(response)

def get_expiring_cert_arns():
    cert_list = []
    
    # Create CloudWatch client
    cloudwatch = boto3.client('cloudwatch')
    
    paginator = cloudwatch.get_paginator('list_metrics')
    
    for response in paginator.paginate(
        MetricName='DaysToExpiry',
        Namespace='AWS/CertificateManager',
        Dimensions=[{'Name': 'CertificateArn'}],):
            cert_list = cert_list + (response['Metrics'])
            
    # return all certs that are expiring according to CW
    return json.dumps(cert_list)

# function to setup the sh region    
def get_sh_region(event_region):
    # security hub findings may need to go to a different region so set that here
    if os.environ.get('SECURITY_HUB_REGION') is None:
        sh_region_local = event_region
    else:
        sh_region_local = os.environ['SECURITY_HUB_REGION']
    
    return sh_region_local
    
# quick function to trim off right side of a string
def right(value, count):
    # To get right part of string, use negative first index in slice.
    return value[-count:] 


  [1]: https://i.sstatic.net/uGjJ4.png

Upvotes: 1

Views: 4337

Answers (2)

Pierre
Pierre

Reputation: 2902

When you test your lambda function, you can create a JSON string to be used as a test event. Make sure that the test event has all the attributes that it needs. For example, a test lambda event is initially set to this:

{
  "key1": "value1",
  "key2": "value2",
  "key3": "value3"
}

You should edit the test event and add all the items that your code expects (find all your occurrences of event['someKey']. In your code, the list of keys I see are:

{
  "detail-type": "some value",
  "resources": "some list maybe with square brackets here",
  "region": "some-regions",
  "time": "some time",
  "id": "998877",
  "account": "12344567",
}

Once all the event['keys'] are part of the test event, you should not see your error anymore.

Upvotes: 0

Adil Hindistan
Adil Hindistan

Reputation: 6605

You can first print the event to help you troubleshoot it by checking whether you are actually getting the key you expect ('detail-type'):

import json
print(f"Received: {json.dumps(obj=event, indent=2)})"

If there is a chance that you will not have 'detail-type', then you can either use try/except to deal with the key error, or use if event.get('detail-type') to guard against it.

Upvotes: 1

Related Questions