DevStarlight
DevStarlight

Reputation: 804

SQS to Lambda + SES

I'm new with Lambda & SQS and I'm trying to create a function to send emails, queued in an SQS service, but I don't understand how to call the process function that contains the send + delete queue methods.

Here bellow I paste my code:

'use strict';

const AWS = require('aws-sdk');

const SQS = new AWS.SQS({ apiVersion: '2012-11-05' });
const Lambda = new AWS.Lambda({ apiVersion: '2015-03-31' });
const ses = new AWS.SES({ accessKeyId: "xxxxxxxx", secretAccesskey: "xxxxxxx/xxxxxxxxx" });
const s3 = new AWS.S3({ apiVersion: "2006-03-01", region: "us-west-2" });


const QUEUE_URL = 'https://sqs.us-west-2.amazonaws.com/xxxxxxx/queue';
const PROCESS_MESSAGE = 'process-message';

function getPieceOfMail (path, mapObj, replace) {
  return new Promise(function (resolve, reject) {
    s3.getObject({
      Bucket: "myBucket",
      Key: "myKey/" + path
    }, function (err, data) {
      if (err) {
        reject(err);
      } else {
        if (replace === true) {
            var re = new RegExp(Object.keys(mapObj).join("|"), "gi");
            data = data.Body.toString().replace(re, function (matched) {
              return mapObj[matched.toLowerCase()];
            });
            resolve(data);
        } else {
            resolve(data.Body.toString());
        }
      }
    });
  });
}

function getRegisterSource (nickname, activate_link) {
  var activate_link, pieces;

  pieces = [
    getPieceOfMail("starts/start.html", {}, false),
    getPieceOfMail("headers/a.html", {}, false),
    getPieceOfMail("footers/a.html", {}, false),
  ];

  return Promise.all(pieces)
    .then(function (data) {
      return (data[0] + data[1] + data[2]);
    })
    .catch(function (err) {
      return err;
    });
}

function sendEmail (email, data) {
    return new Promise(function (resolve, reject) {
        var params = {
            Destination: { ToAddresses: [email] },
            Message: {
              Body: {
                Html: {
                  Data: data
                },
                Text: {
                  Data: data
                }
              },
              Subject: {
                Data: "myData"
              }
            },
            Source: "someone <[email protected]>",
        };

        ses.sendEmail(params, function (err, data) {
            if (err) {
                reject(err);
            } else {
                resolve(data);
            }
        });
    });
}

function process(message, callback) {
    console.log(message);

    // process message
    getRegisterSource(event['nickname'], event['user_id'])
      .then(function (data) {
        return sendEmail(event["email"], data);
      })
      .catch(function (err) {
        console.log("==ERROR==");
        callback(err, err);
      })
      .finally(function () {});

    // delete message
    const params = {
        QueueUrl: QUEUE_URL,
        ReceiptHandle: message.ReceiptHandle,
    };
    SQS.deleteMessage(params, (err) => callback(err, message));
}

function invokePoller(functionName, message) {
    const payload = {
        operation: PROCESS_MESSAGE,
        message,
    };
    const params = {
        FunctionName: functionName,
        InvocationType: 'Event',
        Payload: new Buffer(JSON.stringify(payload)),
    };
    return new Promise((resolve, reject) => {
        Lambda.invoke(params, (err) => (err ? reject(err) : resolve()));
    });
}

function poll(functionName, callback) {
    const params = {
        QueueUrl: QUEUE_URL,
        MaxNumberOfMessages: 10,
        VisibilityTimeout: 10,
    };
    // batch request messages
    SQS.receiveMessage(params, (err, data) => {
        if (err) {
            return callback(err);
        }
        // for each message, reinvoke the function
        const promises = data.Messages.map((message) => invokePoller(functionName, message));
        // complete when all invocations have been made
        Promise.all(promises).then(() => {
            const result = `Messages received: ${data.Messages.length}`;
            callback(null, result);
        });
    });
}

exports.handler = (event, context, callback) => {
    try {
        if (event.operation === PROCESS_MESSAGE) {
            console.log("Invoked by poller");
            process(event.message, callback);
        } else {
            console.log("invoked by schedule");
            poll(context.functionName, callback);
        }
    } catch (err) {
        callback(err);
    }
};

can somebody throw me some light to this?

Thanks in advice.

UPDATE

After so much misconception, I've decided to start looking on how the example of polling-SQS works provided by AWS.

There I've found that I lacked some basic SQS permissions, but solved now by adding the right policy:

{
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Action": [
            "lambda:InvokeFunction"
        ],
        "Resource": ["*"]
    }]
}

This allows Lambda.invoke() to call process().

When the process(message, callback) is called, if I console.log(message);, it seems that there's no message, although the queue is being cleared by the line SQS.deleteMessage(params, (err) => callback(err, message));

What I was trying was to combine my sendMail function that is currently working with a SQS service so I only have to push each message to the queue.

Upvotes: 3

Views: 2644

Answers (1)

Keet Sugathadasa
Keet Sugathadasa

Reputation: 13502

This is a common requirement where AWS SES has its own limitations in sending emails at once. If these limitations are violated, the SES account will sandbox itself. It seems like you have solved the problem using proper access credentials.

This code contains a Python3 Lambda code that can be used to handle a situation like this, where a Lambda polls from SQS using threading, and sends emails using SES, without exceeding the given limitations.

Link to Github Project.

You can also consider using the new feature in SQS, which is capable of invoking lambdas, when a new message is placed within SQS. But, be careful not to exceed the maximum number of lambda functions within the AWS Account region. (See this document)

Upvotes: 1

Related Questions