user6882757
user6882757

Reputation:

How to push the data from dynamodb through stream

Below is the json file

[
    {
        "year": 2013,
        "title": "Rush",
        "actors": [
                "Daniel Bruhl",
                "Chris Hemsworth",
                "Olivia Wilde"
            ]
        
    },
    {
        "year": 2013,
        "title": "Prisoners",
        "actors": [
                "Hugh Jackman",
                "Jake Gyllenhaal",
                "Viola Davis"
            ]
        }
]

Below is the code to push to dynamodb. I have created testjsonbucket bucket name, moviedataten.json is the filename and saved above json.Create a dynamodb with Primary partition key as year (Number) and Primary sort key as title (String).

import json
from decimal import Decimal
import json
import boto3
s3 = boto3.resource('s3')
obj = s3.Object('testjsonbucket', 'moviedataten.json')
body = obj.json
#def lambda_handler(event,context):
#    print (body)

def load_movies(movies, dynamodb=None):
    if not dynamodb:
        dynamodb = boto3.resource('dynamodb')

    table = dynamodb.Table('Movies')
    for movie in movies:
        year = int(movie['year'])
        title = movie['title']
        print("Adding movie:", year, title)
        table.put_item(Item=movie)


def lambda_handler(event, context):
    movie_list = json.loads(body, parse_float=Decimal)
    load_movies(movie_list)

My Requirement:

Any change in Dynamodb has to reflect in the Elasticsearch?

Upvotes: 2

Views: 1338

Answers (2)

Chris Williams
Chris Williams

Reputation: 35146

DynamoDB has a built in feature (DynamoDB streams) that will handle the stream part of this question.

When you configure this you have the choice of the following configurations:

  • KEYS_ONLY — Only the key attributes of the modified item.
  • NEW_IMAGE — The entire item, as it appears after it was modified.
  • OLD_IMAGE — The entire item, as it appeared before it was modified.
  • NEW_AND_OLD_IMAGES — Both the new and the old images of the item.

This will produce an event that looks like the following

{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ]
}

As you're already familiar with Lambda it makes sense to use a Lambda function to consume the records and then iterate through them to process them in the Elasticsearch format before adding them to your index.

When doing this make sure that you iterate through each record as there may be multiple depending on your configuration.

For more information on the steps required for the Lambda side of the function check out the Tutorial: Using AWS Lambda with Amazon DynamoDB streams page.

Upvotes: 1

Adiii
Adiii

Reputation: 59906

This lambda just writing the document to DynamoDb, and I will not recommend adding the code in this lambda to push the same object to Elastic search, as lambda function should perform a single task and pushing the same document to ELK should be managed as a DynamoDB stream.

  • What if ELK is down or not available how you will manage this in lambda?
  • What if you want to disable this in future? you will need to modify lambda instead of controlling this from AWS API or AWS console, all you need to just disable the stream when required no changes on above lambda side code
  • What if you want to move only modify or TTL item to elastic search?

So create Dyanodb Stream that pushes the document to another Lambda that is responsible to push the document to ELK, with this option you can also push old and new both items.

enter image description here

You can look into this article too that describe another approach data-streaming-from-dynamodb-to-elasticsearch

For above approach look into this GitHub project dynamodb-stream-elasticsearch.

const { pushStream } = require('dynamodb-stream-elasticsearch');

const { ES_ENDPOINT, INDEX, TYPE } = process.env;

function myHandler(event, context, callback) {
  console.log('Received event:', JSON.stringify(event, null, 2));
  pushStream({ event, endpoint: ES_ENDPOINT, index: INDEX, type: TYPE })
    .then(() => {
      callback(null, `Successfully processed ${event.Records.length} records.`);
    })
    .catch((e) => {
      callback(`Error ${e}`, null);
    });
}

exports.handler = myHandler;

Upvotes: 2

Related Questions