Reputation: 1
I am trying to publish two specific cloudtrail events in s3
Event names:
AddMemberToGroup
RemoveMemberFromGroup
I am unable to find options to just store these two event logs in s3 buxket. Both logs and s3 bucket are in same account. Later I will use these events from s3 as trigger to lambda function which will extract member_id and group_id from the log. The member_id will be mapped to actual email id and email will be sent via sns. Please help me to create a trail with only 2 events which can be published to s3.
Upvotes: -1
Views: 317
Reputation: 1
I implemented this architecture with the help of lambda function and event bridge rules.
So the cloudtrail logs are utilized by event bridge to filter out desired logs.
I created below rule to filter out AddMemberToGroup
and RemoveMemberFromGroup
:
{
"detail-type": ["AWS API Call via CloudTrail"],
"source": ["aws.sso-directory"],
"detail": {
"eventSource": ["sso-directory.amazonaws.com"],
"errorCode": [{
"exists": false
}],
"eventName": ["RemoveMemberFromGroup", "AddMemberToGroup"]
}
}
The target of this event bridge was set to lambda function. This lambda function checked if the event is same as AddMemberToGroup
and RemoveMemberFromGroup
, it fetched the name of the sso group and member details. These details were sent via email using Simple email service.
Lambda Code:
import json
import boto3
import logging
import os
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
def get_group_name(identitystore_client,group_id):
try:
response = identitystore_client.describe_group(IdentityStoreId='d-123xyzabc12',GroupId=group_id)
print(f"group metadata:{response}")
groupname = response['DisplayName']
return groupname
except Exception as e:
LOGGER.error(f"Error retrieving member name: {str(e)}")
return None
def get_member_name(identitystore_client,member_id):
try:
response = identitystore_client.describe_user(IdentityStoreId='d-123xyzabc12',UserId=member_id)
print(f"User metadata:{response}")
membername = response['UserName']
displayname = response['DisplayName']
print(displayname)
return [membername,displayname]
except Exception as e:
LOGGER.error(f"Error retrieving member name: {str(e)}")
return None
def lambda_handler(event, context):
LOGGER.info(event)
ses = boto3.client('ses')
identitystore_client = boto3.client('identitystore')
source_email = 'provide your source email'
EmailArn='pls provide your email arn'
destination_emails = ['provide your destination mail id']
request_parameters = event.get('detail', {}).get('requestParameters', {})
group_id = str(request_parameters.get('groupId'))
print(f'group id type - {type(group_id)}')
print(f'groupId:{group_id}')
event_name = str(event.get('detail', {}).get('eventName'))
if event_name == 'AddMemberToGroup':
member_id = str(request_parameters.get('member',{}).get('memberId'))
print(f'member id type - {type(member_id)}')
print(f'member_id:{member_id}')
else:
member_id = str(request_parameters.get('memberId'))
print(f'member id type - {type(member_id)}')
print(f'member_id:{member_id}')
member_name = str(get_member_name(identitystore_client,member_id))
group_name = str(get_group_name(identitystore_client,group_id))
email_content = f"""
<p align="left">Dear team,</br></br>
There is a change :</br></br>
<b>Event Name</b>: {event_name}</br>
<b>User</b>: {member_name}</br>
<b>LDAP Group</b>: {group_name}</br>
"""
# Send the email using SES
if 'admin' in group_name.lower():
ses.send_email(
Source=source_email,
SourceArn=EmailArn,
Destination={'ToAddresses': destination_emails},
Message={
'Subject': {'Data': 'Alert!!!'},
'Body': {'Html': {'Data':email_content}}
}
)
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
Upvotes: 0
Reputation: 7903
I'm not sure that you can create a trail which only sends those events. I think you have to have the trail send all events, and do the filtering in the lambda (which is bad because you'll trigger a lot of unnecessary lambda invocations. :( )
Upvotes: 0