Anže Mur
Anže Mur

Reputation: 1545

Invoking multiple AWS Lambdas doesn't make paralel processes

I am trying to invoke multiple lambda functions (one lambda function, that would run separate parallel processes) from another lambda function. The first one runs as cron lambda that just queries docs from db and then invokes another lambda with doc's params. This cron lambda runs every five minutes and queries the docs correctly. I was testing the second lambda with two documents. The problem is that every time the second lambda gets invoked it only process one document - every time it processes the other one it didn't process on the previous invoke:

Ex:

First, invoke of second lambda -> process doc 1

Second, invoke of second lambda -> process doc 2

Third, invoke of second lambda -> process doc 1

Forth invoke of second lambda -> process doc 2

etc...

First (cron) lambda code:

aws.config.update({
  region : env.lambdaRegion,
  accessKeyId: env.lambdaAccessKeyId,
  secretAccessKey: env.lambdaSecretAccessKey,
});

const lambda = new aws.Lambda({
  region: env.lambdaRegion,
});

exports.handler = async (event: any, context: any) => {
  context.callbackWaitsForEmptyEventLoop = false;

  return new Promise(async (resolve, reject) => {
    for (let i = 0; i < 100; i++) {
      const doc = await mongo.db.collection('docs').
        findOneAndUpdate(
          {
            status: 1,
            lambdaProcessing: null,
          },
          { $set: { lambdaProcessing: new Date() } },
          {
            sort: { processedAt: 1 },
            returnNewDocument: true,
          },
        );

      if (doc.value && doc.value._id) {
        const params = {
          FunctionName: env.lambdaName,
          InvocationType: 'Event',
          Payload: JSON.stringify({ docId: doc.value._id }),
        };

        lambda.invoke(params);
      } else {
        if (doc.lastErrorObject && doc.lastErrorObject.n === 0) {
          break;
        }
      }
    }
    resolve();
  });
};

Second lambda function:

exports.handler = async (event: any, ctx: any) => {
  ctx.callbackWaitsForEmptyEventLoop = false;

  if (event && event.docId) {
    const doc = await mongo.db.collection('docs').findById(event.docId);
    return await processDoc(doc);
  } else {
    throw new Error('doc ID is not present.');
  }
};

Upvotes: 3

Views: 5118

Answers (2)

Anže Mur
Anže Mur

Reputation: 1545

The key was to create new seperate aws.Lambda() instance for every lambda we want to invoke, then we have to resolve and await every lambda we invoked (promieses array). This is OK if the invoked lambdas doesn't need to be awaited, so we don't waste processing time on AWS - so invoked lambda starts processing and then resolves without awaiting its response so the main (cron) lambda can resolve.

Fixed (cron) lambda handler:

aws.config.update({
  region : env.lambdaRegion,
  accessKeyId: env.lambdaAccessKeyId,
  secretAccessKey: env.lambdaSecretAccessKey,
});

exports.handler = async (event: any, context: any) => {
  context.callbackWaitsForEmptyEventLoop = false;

  return new Promise(async (resolve, reject) => {
    const promises: any = [];
    for (let i = 0; i < 100; i++) {
      const doc = await global['mongo'].db.collection('docs').
        findOneAndUpdate(
          {
            status: 1,
            lambdaProcessing: null,
          },
          { $set: { lambdaProcessing: new Date() } },
          {
            sort: { processedAt: 1 },
            returnNewDocument: true,
          },
        );

      if (doc.value && doc.value._id) {
        const params = {
          FunctionName: env.lambdaName,
          InvocationType: 'Event',
          Payload: JSON.stringify({ docId: doc.value._id }),
        };

        const lambda = new aws.Lambda({
          region: env.lambdaRegion,
          maxRetries: 0,
        });

        promises.push(
          new Promise((invokeResolve, invokeReject) => {
            lambda.invoke(params, (error, data) => {
              if (error) { console.error('ERROR: ', error); }
              if (data) { console.log('SUCCESS:', data); }
              // Resolve invoke promise in any case.
              invokeResolve();
            });
          }),
        );
      } else {
        if (doc.lastErrorObject && doc.lastErrorObject.n === 0) {
          break;
        }
      }
    }
    await Promise.all(promises);
    resolve();
  });
};

Second (processing) lambda:

exports.handler = async (event: any, ctx: any) => {
  ctx.callbackWaitsForEmptyEventLoop = false;

  if (event && event.docId) {
    const doc = await mongo.db.collection('docs').findById(event.docId);
    processDoc(doc);
    return ctx.succeed('Completed.');
  } else {
    throw new Error('Doc ID is not present.');
  }
};

I don't know if there is any better way of achieving this using strictly lambda functions, but this works.

Upvotes: 2

wfreude
wfreude

Reputation: 508

To run multiple lambdas in parallel without an "ugly" cronjob solution I would recommend using AWS step functions with type Parallel. You can set up the logic in your serverless.yml, the function calls itself are lambda functions. You can pass data by the second argument of callback. If the data is larger than 32kb I would recommend using an S3 bucket/database though.

Example serverless.yml

stepFunctions:
  stateMachines:
    test:
      name: 'test'
      definition:
        Comment: "Testing tips-like state structure"
        StartAt: GatherData
        States:
          GatherData:
            Type: Parallel
            Branches:
              -
                StartAt: GatherDataA
                States:
                  GatherDataA:
                    Type: Task
                    Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-firstA"
                    TimeoutSeconds: 15
                    End: true
              -
                StartAt: GatherDataB
                States:
                  GatherDataB:
                    Type: Task
                    Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-firstB"
                    TimeoutSeconds: 15
                    End: true
            Next: ResolveData
          ResolveData:
            Type: Task
            Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-resolveAB"
            TimeoutSeconds: 15
            End: true

Example handlers

module.exports.firstA = (event, context, callback) => {
  const data = {
    id: 3,
    somethingElse: ['Hello', 'World'],
  };
  callback(null, data);
};
module.exports.firstB = (event, context, callback) => {
  const data = {
    id: 12,
    somethingElse: ['olleH', 'dlroW'],
  };
  callback(null, data);
};

module.exports.resolveAB = (event, context, callback) => {
  console.log("resolving data from a and b: ", event);
  const [dataFromA, dataFromB] = event;
  callback(null, event);
};

More information see

Upvotes: 3

Related Questions