Aniruddha
Aniruddha

Reputation: 1039

Asynchronous foreach or for loop in NodeJS

I'm at beginner level of NodeJs, so kindly excuse. Following lambda function is to zip/compress files in S3 and upload the compressed file back to S3. listOfKeys contains list of keys to be zipped. If you notice for (const file in listOfKeys), this runs synchronously resulting lambda to timeout if there is a large dataset, i.e if listOfKeys has long list of keys. Questions is, is there a way to run the loop asynchronously or parallelly, so that files are zipped in timely manner or asynchronously?

Code:

const AWS = require('aws-sdk');
const async = require('async');
const archiver = require('archiver');
const stream = require('stream');
const request = require('request');

const awsOptions = {
    region: 'us-east-1'
};
const s3 = new AWS.S3(awsOptions);

const streamTo = (bucket, key) => {
    var passthrough = new stream.PassThrough();
    s3.upload({
        Bucket: bucket,
        Key: key,
        Body: passthrough,
        ContentType: "application/zip",
    },
        (err, data) => {
            if (err) throw err;
        }
    );
    return passthrough;
};

const getStream = (bucket, key) => {
    let streamCreated = false;
    const passThroughStream = new stream.PassThrough();

    passThroughStream.on("newListener", event => {
        if (!streamCreated && event == "data") {
            const s3Stream = s3
                .getObject({ Bucket: bucket, Key: key })
                .createReadStream();
            s3Stream
                .on("error", err => passThroughStream.emit("error", err))
                .pipe(passThroughStream);

            streamCreated = true;
        }
    });
    return passThroughStream;
};

exports.handler = async (event, context, callback) => {

    let totalKeys = 0;
    const listOfKeys = [];
    const SrcBucket = event.Records[0].s3.bucket.name;
    const trigger_file = event.Records[0].s3.object.key;
    const prefix = trigger_file.split('/')[0] + '/' + trigger_file.split('/')[1] + '/';
    const dirToZip = trigger_file.split('/')[2].substr(0, trigger_file.split('/')[2].length - '.renamed'.length);
    const s3ListFilter = prefix + dirToZip;
    const destinationKey = prefix + `${dirToZip}.zip`;
    const bucketParams = {
        Bucket: SrcBucket,
        Delimiter: '/',
        Prefix: s3ListFilter + '/'
    };

    let data;
    do {
        bucketParams.Marker = (data && data.NextMarker) ? data.NextMarker : undefined;
        data = await s3.listObjects(bucketParams).promise();
        const contents = data.Contents;
        totalKeys = totalKeys + contents.length;
        listOfKeys.push(...contents.map(x => x.Key));
    } while (data.IsTruncated);

    console.log(`Total keys: ${listOfKeys.length}`);
    
    await new Promise(async (resolve, reject) => {
        var zipStream = streamTo(SrcBucket, destinationKey);
        zipStream.on("close", resolve);
        zipStream.on("end", resolve);
        zipStream.on("error", reject);
        var archive = archiver("zip");
        archive.on("error", err => {
            throw new Error(err);
        });
        archive.pipe(zipStream);

        var keysCounter = 0;
        listOfKeys.forEach(file => {
            archive.append(getStream(SrcBucket, file), { name: file.split('/')[3] })
            keysCounter++
            if (keysCounter >= Object.keys(listOfKeys).length) {
                // Called at the end of the loop
                archive.finalize();
            }
        });

        //archive.finalize();
    }).catch(err => {
        throw new Error(err);
    });

    callback(null, {
        body: { final_destination: destinationKey }
    });
};

Upvotes: 0

Views: 431

Answers (3)

steadweb
steadweb

Reputation: 16541

Instead of trying to do them within one lambda function, offset them using SQS, having a separate lambda to process each zip.

That way, you can isolate the following:

  • Failures between each zip archive
  • Run each zip process in parallel
  • Isolate processing per zip in a single lambda function
  • Implement a dead letter queue for messages (or zips) that cannot be handled
  • Creates SRP (single responsibility) within your application i.e. one lambda take the zip file and another to process it

Upvotes: 1

CreaZyp154
CreaZyp154

Reputation: 425

Array.prototype.forEach()

const array1 = ['a', 'b', 'c'];

array1.forEach(element => console.log(element));

// expected output: "a"
// expected output: "b"
// expected output: "c"

So your code should be :

listOfKeys.forEach(file => {
    archive.append(getStream(SrcBucket, listOfKeys[file]), { name: listOfKeys[file].split('/')[3] })
})

(Not sure if it works though, let me know)

Source : Array.prototype.forEach() | MDN

Edit :

so archive.finalize() should be called after the loop ended, there are several way to do it but I think this one should work fine. See : Callback after all asynchronous forEach callbacks are completed

//There's probably a better way to do it but it works :
keysCounter = 0
listOfKeys.forEach(file => {
    archive.append(getStream(SrcBucket, listOfKeys[file]), { name: listOfKeys[file].split('/')[3] })
    keyCounter++
    if(keyCounter >= Object.keys(listOfKeys).length) {
        // Called at the end of the loop
        archive.finalize();
    }
})

Upvotes: 1

Jeff Breadner
Jeff Breadner

Reputation: 1438

I would probably rewrite the whole thing more agressively, but to answer your specific question: replace your listOfKeys.forEach statement with this:

await Promise
  .all(
    listOfKeys.map(key => archive.append(getStream(SrcBucket, key), { name: key.split('/')[3] }))
  );

Upvotes: 1

Related Questions