Fury
Fury

Reputation: 161

How to use async/await sequentially inside lambda function?

I am creating a Lambda Function which gets data from s3 bucket and stream it to fast-csv for parsing. After that, I need to connect to documentDB database to send those parsed data.

But the problem is that sometimes the database connection function runs before the parse function and throws blank array and parsed function dont run sometime or vice-versa.

So, how can I run the parse function (parserFcn function) always before the database connection and send function (connectToDb function) so that it can get data from the parse function.

Here is the code -

const AWS = require("aws-sdk");
const fs = require("fs");
const csv = require("@fast-csv/parse");
const MongoClient = require("mongodb").MongoClient;

const s3 = new AWS.S3();

exports.handler = async (event, context, callback) => {
  const bucketName = event.Records[0].s3.bucket.name;
  const keyName = event.Records[0].s3.object.key;

  console.log("Bucket Name->", JSON.stringify(bucketName));
  console.log("Bucket key->", JSON.stringify(keyName));

  var params = {
    Bucket: bucketName,
    Key: keyName,
  };
  var parsedData = [];
  const s3Contents = s3.getObject(params).createReadStream();

  let parserFcn = new Promise((resolve, reject) => {
    const parser = csv
      .parseStream(s3Contents, { headers: true })
      .on("data", function (data) {
        parsedData.push(data);
      })
      .on("end", (rowCount) => {
        console.log(`Parsed ${rowCount} rows`);
        resolve(parsedData);
      })
      .on("error", function () {
        reject("csv parse process failed");
      });
    return parser;
  });

  let connectToDb = new Promise((resolve, reject) => {
    var client = MongoClient.connect(
      "mongodb://user:pass@host/?ssl=true&retryWrites=false",
      {
        tlsCAFile: `/opt/rds-combined-ca-bundle.pem`, //Specify the DocDB; cert
      },
      function (err, client) {
        if (err) {
          throw err;
        } else {
          console.log("connected ");
        }
        console.log("parsedData inside conn ", parsedData);

        // Specify the database to be used
        db = client.db("database-name");

        // Specify the collection to be used
        col = db.collection("collection-name");

        // Insert Multiple document
        col.insertMany(parsedData, function (err, result) {
          if (err) {
            console.log("error->", err);
          }
          console.log("Result from db->", result);

          //Close the connection
          client.close();
        });
      }
    );
    return client;
  });

  const parserdata = await parserFcn;
  const conn = await connectToDb;

  let promiseFactories = [parserdata, conn];

  Promise.all(promiseFactories).then((data) => {
    console.log("completed all promises", data);
  });
};

Upvotes: 2

Views: 772

Answers (2)

traktor
traktor

Reputation: 19346

Here's an attempt at replacing promise definitions in the post with functions that return a promise as suggested by @Gamma032. You may find it useful as a guide to compare with code you are writing and what the handler is supposed to do.

The replacement functions were not declared as async functions because they're using callbacks to resolve/reject the new promises they create and return. Waiting for functions to complete in the order called is performed inside a standard try/catch so that code can detect await re-throwing the rejection reason of a rejected promise it was waiting on.

I left the global variables mentioned in comment as they were, but moved the initial definition of parsedData inside the parseData function and renamed the "connection" function to updateDB because it both connects to and updates the database.

const AWS = require("aws-sdk");
const fs = require("fs");
const csv = require("@fast-csv/parse");
const MongoClient = require("mongodb").MongoClient;

const s3 = new AWS.S3();

exports.handler = async (event, context, callback) => {
  const bucketName = event.Records[0].s3.bucket.name;
  const keyName = event.Records[0].s3.object.key;

  console.log("Bucket Name->", JSON.stringify(bucketName));
  console.log("Bucket key->", JSON.stringify(keyName));

  var params = {
    Bucket: bucketName,
    Key: keyName,
  };
  const s3Contents = s3.getObject(params).createReadStream();
  
  function parseData() {
    return new Promise((resolve, reject) => {
      const parsedData = [];
      csv.parseStream(s3Contents, { headers: true })
        .on("data", function (data) {
          parsedData.push(data);
        })
        .on("end", (rowCount) => {
          console.log(`Parsed ${rowCount} rows`);
          resolve(parsedData);
        })
        .on("error", function () {
          reject("csv parse process failed");
        });
    });
  }

  function updateDB(parsedData) {
    console.log("parsedData inside updateDB ", parsedData);
    return new Promise((resolve, reject) => {
      var client = MongoClient.connect(
        "mongodb://user:pass@host/?ssl=true&retryWrites=false",
        {
          tlsCAFile: `/opt/rds-combined-ca-bundle.pem`, //Specify the DocDB; cert
        },
        function (err, client) {
          if (err) {
            console.error( "connection failure");
            reject(err);
            return; // error return
          }
          console.log("connected ");
          // Specify the database to be used
          db = client.db("database-name");

          // Specify the collection to be used
          col = db.collection("collection-name");

          // Insert Multiple document
          col.insertMany(parsedData, function (err, result) {
            if (err) {
              console.error( "insertion failure");
              reject( err);
            } else {
              resolve( result);
           // Close the connection
           client.close();
            } 
          });

          
        });
      }
    );
  }
  
  // call parseData and updateDB in order
  try {
    const parsedData = await parseData();
    const result = await updateDB(parsedData);
    console.log( "DB updated with result", result);

    // see note:
    // const promiseFactories = [parsedData, result];
    //Promise.all(promiseFactories).then((data) => {

     console.log("completed all promises", data);

    // });
  }
  catch(err) {
     console.error( err); // there may already be a line on the console about the error.
  }
}

  
};

Note

An edit from the OP added

    const promiseFactories = [parsedData, result];
    Promise.all(promiseFactories).then((data) => {
     console.log("completed all promises", data);
    });

to the try clause after awaiting the values of parsedData and result. However neither of these values in a promise (you can't fulfill a promise with a promise and the await operator never returns a promise as the result of the await operation), so passing them through a call to Promise.all simply puts a job in the promise job queue to perform the console.log from the then handler. Logging the message after awaiting both values should suffice.

Upvotes: 2

Gamma032
Gamma032

Reputation: 451

You should await functions that return promises, not variables that hold promises.

Declaring let parserFcn = new Promise(...) and let connectToDb = new Promise(...) starts the parsing and database connection, with no guarantees on execution order.

So declare two functions:

  1. parserFcn, which returns a promise to the parsed data array.
  2. connectToDb, which takes the parsed data and pushes it to the database.

Then just call them in order:

const parsedData = await parserFn()
await connectToDb(parsedData)

Upvotes: 1

Related Questions