Reputation:
Below is the json file
[
{
"year": 2013,
"title": "Rush",
"actors": [
"Daniel Bruhl",
"Chris Hemsworth",
"Olivia Wilde"
]
},
{
"year": 2013,
"title": "Prisoners",
"actors": [
"Hugh Jackman",
"Jake Gyllenhaal",
"Viola Davis"
]
}
]
Below is the code to push to dynamodb. I have created testjsonbucket
bucket name, moviedataten.json
is the filename and saved above json.Create a dynamodb with Primary partition key as year (Number) and
Primary sort key as title (String).
import json
from decimal import Decimal
import json
import boto3
s3 = boto3.resource('s3')
obj = s3.Object('testjsonbucket', 'moviedataten.json')
body = obj.json
#def lambda_handler(event,context):
# print (body)
def load_movies(movies, dynamodb=None):
if not dynamodb:
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Movies')
for movie in movies:
year = int(movie['year'])
title = movie['title']
print("Adding movie:", year, title)
table.put_item(Item=movie)
def lambda_handler(event, context):
movie_list = json.loads(body, parse_float=Decimal)
load_movies(movie_list)
https://xx.x.x.com/testelas
My Requirement:
Any change in Dynamodb has to reflect in the Elasticsearch?
Upvotes: 2
Views: 1338
Reputation: 35146
DynamoDB has a built in feature (DynamoDB streams) that will handle the stream part of this question.
When you configure this you have the choice of the following configurations:
KEYS_ONLY
— Only the key attributes of the modified item.NEW_IMAGE
— The entire item, as it appears after it was modified.OLD_IMAGE
— The entire item, as it appeared before it was modified.NEW_AND_OLD_IMAGES
— Both the new and the old images of the item.This will produce an event that looks like the following
{
"Records":[
{
"eventID":"1",
"eventName":"INSERT",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"Keys":{
"Id":{
"N":"101"
}
},
"NewImage":{
"Message":{
"S":"New item!"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"111",
"SizeBytes":26,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
},
{
"eventID":"2",
"eventName":"MODIFY",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"Keys":{
"Id":{
"N":"101"
}
},
"NewImage":{
"Message":{
"S":"This item has changed"
},
"Id":{
"N":"101"
}
},
"OldImage":{
"Message":{
"S":"New item!"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"222",
"SizeBytes":59,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
},
{
"eventID":"3",
"eventName":"REMOVE",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"Keys":{
"Id":{
"N":"101"
}
},
"OldImage":{
"Message":{
"S":"This item has changed"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"333",
"SizeBytes":38,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
}
]
}
As you're already familiar with Lambda it makes sense to use a Lambda function to consume the records and then iterate through them to process them in the Elasticsearch format before adding them to your index.
When doing this make sure that you iterate through each record as there may be multiple depending on your configuration.
For more information on the steps required for the Lambda side of the function check out the Tutorial: Using AWS Lambda with Amazon DynamoDB streams page.
Upvotes: 1
Reputation: 59906
This lambda just writing the document to DynamoDb, and I will not recommend adding the code in this lambda to push the same object to Elastic search, as lambda function should perform a single task and pushing the same document to ELK should be managed as a DynamoDB stream.
So create Dyanodb Stream that pushes the document to another Lambda that is responsible to push the document to ELK, with this option you can also push old and new both items.
You can look into this article too that describe another approach data-streaming-from-dynamodb-to-elasticsearch
For above approach look into this GitHub project dynamodb-stream-elasticsearch.
const { pushStream } = require('dynamodb-stream-elasticsearch');
const { ES_ENDPOINT, INDEX, TYPE } = process.env;
function myHandler(event, context, callback) {
console.log('Received event:', JSON.stringify(event, null, 2));
pushStream({ event, endpoint: ES_ENDPOINT, index: INDEX, type: TYPE })
.then(() => {
callback(null, `Successfully processed ${event.Records.length} records.`);
})
.catch((e) => {
callback(`Error ${e}`, null);
});
}
exports.handler = myHandler;
Upvotes: 2