Reputation: 1039
I'm at beginner level of NodeJs, so kindly excuse. Following lambda function is to zip/compress files in S3 and upload the compressed file back to S3. listOfKeys
contains list of keys to be zipped. If you notice for (const file in listOfKeys)
, this runs synchronously resulting lambda to timeout if there is a large dataset, i.e if listOfKeys
has long list of keys. Questions is, is there a way to run the loop asynchronously or parallelly, so that files are zipped in timely manner or asynchronously?
Code:
const AWS = require('aws-sdk');
const async = require('async');
const archiver = require('archiver');
const stream = require('stream');
const request = require('request');
const awsOptions = {
region: 'us-east-1'
};
const s3 = new AWS.S3(awsOptions);
const streamTo = (bucket, key) => {
var passthrough = new stream.PassThrough();
s3.upload({
Bucket: bucket,
Key: key,
Body: passthrough,
ContentType: "application/zip",
},
(err, data) => {
if (err) throw err;
}
);
return passthrough;
};
const getStream = (bucket, key) => {
let streamCreated = false;
const passThroughStream = new stream.PassThrough();
passThroughStream.on("newListener", event => {
if (!streamCreated && event == "data") {
const s3Stream = s3
.getObject({ Bucket: bucket, Key: key })
.createReadStream();
s3Stream
.on("error", err => passThroughStream.emit("error", err))
.pipe(passThroughStream);
streamCreated = true;
}
});
return passThroughStream;
};
exports.handler = async (event, context, callback) => {
let totalKeys = 0;
const listOfKeys = [];
const SrcBucket = event.Records[0].s3.bucket.name;
const trigger_file = event.Records[0].s3.object.key;
const prefix = trigger_file.split('/')[0] + '/' + trigger_file.split('/')[1] + '/';
const dirToZip = trigger_file.split('/')[2].substr(0, trigger_file.split('/')[2].length - '.renamed'.length);
const s3ListFilter = prefix + dirToZip;
const destinationKey = prefix + `${dirToZip}.zip`;
const bucketParams = {
Bucket: SrcBucket,
Delimiter: '/',
Prefix: s3ListFilter + '/'
};
let data;
do {
bucketParams.Marker = (data && data.NextMarker) ? data.NextMarker : undefined;
data = await s3.listObjects(bucketParams).promise();
const contents = data.Contents;
totalKeys = totalKeys + contents.length;
listOfKeys.push(...contents.map(x => x.Key));
} while (data.IsTruncated);
console.log(`Total keys: ${listOfKeys.length}`);
await new Promise(async (resolve, reject) => {
var zipStream = streamTo(SrcBucket, destinationKey);
zipStream.on("close", resolve);
zipStream.on("end", resolve);
zipStream.on("error", reject);
var archive = archiver("zip");
archive.on("error", err => {
throw new Error(err);
});
archive.pipe(zipStream);
var keysCounter = 0;
listOfKeys.forEach(file => {
archive.append(getStream(SrcBucket, file), { name: file.split('/')[3] })
keysCounter++
if (keysCounter >= Object.keys(listOfKeys).length) {
// Called at the end of the loop
archive.finalize();
}
});
//archive.finalize();
}).catch(err => {
throw new Error(err);
});
callback(null, {
body: { final_destination: destinationKey }
});
};
Upvotes: 0
Views: 431
Reputation: 16541
Instead of trying to do them within one lambda function, offset them using SQS, having a separate lambda to process each zip.
That way, you can isolate the following:
Upvotes: 1
Reputation: 425
const array1 = ['a', 'b', 'c'];
array1.forEach(element => console.log(element));
// expected output: "a"
// expected output: "b"
// expected output: "c"
So your code should be :
listOfKeys.forEach(file => {
archive.append(getStream(SrcBucket, listOfKeys[file]), { name: listOfKeys[file].split('/')[3] })
})
(Not sure if it works though, let me know)
Source : Array.prototype.forEach() | MDN
so archive.finalize()
should be called after the loop ended, there are several way to do it but I think this one should work fine. See : Callback after all asynchronous forEach callbacks are completed
//There's probably a better way to do it but it works :
keysCounter = 0
listOfKeys.forEach(file => {
archive.append(getStream(SrcBucket, listOfKeys[file]), { name: listOfKeys[file].split('/')[3] })
keyCounter++
if(keyCounter >= Object.keys(listOfKeys).length) {
// Called at the end of the loop
archive.finalize();
}
})
Upvotes: 1
Reputation: 1438
I would probably rewrite the whole thing more agressively, but to answer your specific question: replace your listOfKeys.forEach
statement with this:
await Promise
.all(
listOfKeys.map(key => archive.append(getStream(SrcBucket, key), { name: key.split('/')[3] }))
);
Upvotes: 1