Reputation: 1545
I am trying to invoke multiple lambda functions (one lambda function, that would run separate parallel processes) from another lambda function. The first one runs as cron lambda that just queries docs from db and then invokes another lambda with doc's params. This cron lambda runs every five minutes and queries the docs correctly. I was testing the second lambda with two documents. The problem is that every time the second lambda gets invoked it only process one document - every time it processes the other one it didn't process on the previous invoke:
Ex:
First, invoke of second lambda -> process doc 1
Second, invoke of second lambda -> process doc 2
Third, invoke of second lambda -> process doc 1
Forth invoke of second lambda -> process doc 2
etc...
First (cron) lambda code:
aws.config.update({
region : env.lambdaRegion,
accessKeyId: env.lambdaAccessKeyId,
secretAccessKey: env.lambdaSecretAccessKey,
});
const lambda = new aws.Lambda({
region: env.lambdaRegion,
});
exports.handler = async (event: any, context: any) => {
context.callbackWaitsForEmptyEventLoop = false;
return new Promise(async (resolve, reject) => {
for (let i = 0; i < 100; i++) {
const doc = await mongo.db.collection('docs').
findOneAndUpdate(
{
status: 1,
lambdaProcessing: null,
},
{ $set: { lambdaProcessing: new Date() } },
{
sort: { processedAt: 1 },
returnNewDocument: true,
},
);
if (doc.value && doc.value._id) {
const params = {
FunctionName: env.lambdaName,
InvocationType: 'Event',
Payload: JSON.stringify({ docId: doc.value._id }),
};
lambda.invoke(params);
} else {
if (doc.lastErrorObject && doc.lastErrorObject.n === 0) {
break;
}
}
}
resolve();
});
};
Second lambda function:
exports.handler = async (event: any, ctx: any) => {
ctx.callbackWaitsForEmptyEventLoop = false;
if (event && event.docId) {
const doc = await mongo.db.collection('docs').findById(event.docId);
return await processDoc(doc);
} else {
throw new Error('doc ID is not present.');
}
};
Upvotes: 3
Views: 5118
Reputation: 1545
The key was to create new seperate aws.Lambda()
instance for every lambda we want to invoke, then we have to resolve and await every lambda we invoked (promieses array). This is OK if the invoked lambdas doesn't need to be awaited, so we don't waste processing time on AWS - so invoked lambda starts processing and then resolves without awaiting its response so the main (cron) lambda can resolve.
Fixed (cron) lambda handler:
aws.config.update({
region : env.lambdaRegion,
accessKeyId: env.lambdaAccessKeyId,
secretAccessKey: env.lambdaSecretAccessKey,
});
exports.handler = async (event: any, context: any) => {
context.callbackWaitsForEmptyEventLoop = false;
return new Promise(async (resolve, reject) => {
const promises: any = [];
for (let i = 0; i < 100; i++) {
const doc = await global['mongo'].db.collection('docs').
findOneAndUpdate(
{
status: 1,
lambdaProcessing: null,
},
{ $set: { lambdaProcessing: new Date() } },
{
sort: { processedAt: 1 },
returnNewDocument: true,
},
);
if (doc.value && doc.value._id) {
const params = {
FunctionName: env.lambdaName,
InvocationType: 'Event',
Payload: JSON.stringify({ docId: doc.value._id }),
};
const lambda = new aws.Lambda({
region: env.lambdaRegion,
maxRetries: 0,
});
promises.push(
new Promise((invokeResolve, invokeReject) => {
lambda.invoke(params, (error, data) => {
if (error) { console.error('ERROR: ', error); }
if (data) { console.log('SUCCESS:', data); }
// Resolve invoke promise in any case.
invokeResolve();
});
}),
);
} else {
if (doc.lastErrorObject && doc.lastErrorObject.n === 0) {
break;
}
}
}
await Promise.all(promises);
resolve();
});
};
Second (processing) lambda:
exports.handler = async (event: any, ctx: any) => {
ctx.callbackWaitsForEmptyEventLoop = false;
if (event && event.docId) {
const doc = await mongo.db.collection('docs').findById(event.docId);
processDoc(doc);
return ctx.succeed('Completed.');
} else {
throw new Error('Doc ID is not present.');
}
};
I don't know if there is any better way of achieving this using strictly lambda functions, but this works.
Upvotes: 2
Reputation: 508
To run multiple lambdas in parallel without an "ugly" cronjob solution I would recommend using AWS step functions with type Parallel
. You can set up the logic in your serverless.yml
, the function calls itself are lambda functions. You can pass data by the second argument of callback
. If the data is larger than 32kb I would recommend using an S3 bucket/database though.
Example serverless.yml
stepFunctions:
stateMachines:
test:
name: 'test'
definition:
Comment: "Testing tips-like state structure"
StartAt: GatherData
States:
GatherData:
Type: Parallel
Branches:
-
StartAt: GatherDataA
States:
GatherDataA:
Type: Task
Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-firstA"
TimeoutSeconds: 15
End: true
-
StartAt: GatherDataB
States:
GatherDataB:
Type: Task
Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-firstB"
TimeoutSeconds: 15
End: true
Next: ResolveData
ResolveData:
Type: Task
Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-resolveAB"
TimeoutSeconds: 15
End: true
Example handlers
module.exports.firstA = (event, context, callback) => {
const data = {
id: 3,
somethingElse: ['Hello', 'World'],
};
callback(null, data);
};
module.exports.firstB = (event, context, callback) => {
const data = {
id: 12,
somethingElse: ['olleH', 'dlroW'],
};
callback(null, data);
};
module.exports.resolveAB = (event, context, callback) => {
console.log("resolving data from a and b: ", event);
const [dataFromA, dataFromB] = event;
callback(null, event);
};
More information see
Upvotes: 3