Nuno Vivas
Nuno Vivas

Reputation: 33

Handling of rabbit messages via NESTJs microservice issue

I'm currently having a problem that i'm unable to solve. This is only in extreme cases, when the server, for some reason, goes offline, messages accumulate (100k or more) and then they need to be processed all at the same time. Even though i'm planning for this never to happen, i would like to have a backup plan for it and more control on this issue. I'm running an NestJS microservice against a RabbitMQ broker to get messages that arrive from IOT devices and insert them into a MySQL database. Every message has a little conversion/translation operation that needs to be done before the insert. This conversion is based on a single row query done against a table on the same SQL Server.

The order is the following:

Now, i'm facing this error:

(node:1129233) UnhandledPromiseRejectionWarning: SequelizeConnectionAcquireTimeoutError: Operation timeout
            at ConnectionManager.getConnection (/home/nunovivas/NestJSProjects/integrador/node_modules/sequelize/lib/dialects/abstract/connection-manager.js:288:48)
            at runNextTicks (internal/process/task_queues.js:60:5)
            at listOnTimeout (internal/timers.js:526:9)
            at processTimers (internal/timers.js:500:7)
            at /home/nunovivas/NestJSProjects/integrador/node_modules/sequelize/lib/sequelize.js:613:26
            at MySQLQueryInterface.select (/home/nunovivas/NestJSProjects/integrador/node_modules/sequelize/lib/dialects/abstract/query-interface.js:953:12)
            at Function.findAll (/home/nunovivas/NestJSProjects/integrador/node_modules/sequelize/lib/model.js:1752:21)
            at Function.findOne (/home/nunovivas/NestJSProjects/integrador/node_modules/sequelize/lib/model.js:1916:12)
        node_modules/source-map-support/source-map-support.js:516
        (node:1129233) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 1349)

I think that the promise rejection is inside the sequelize module.

This is my sequelize configuration:

useFactory: async (ConfigService: ConfigService) => ({
                dialect: 'mysql',
                host: 'someserver',
                port: 3306,
                username: 'dede',
                password: 'dudu!',
                database: 'dada',
                autoLoadModels: true,
                pool: { max: 5, min: 0, adquire: 1800000, idle: 5000 }, 
                synchronize: true,
                logQueryParameters: true,

This is part of my message service:

@RabbitRPC({
            exchange: 'BACKEND_MAINEXCHANGE',
            routingKey: 'Facility_DeviceReadings',
            queue: 'Facility_DeviceReadings',
          })
          public async rpcHandlerDeviceReadings(mensagem: ReadingDevicesPerFacility) {
            const schemavalid = mensagem;
            this.mylogger.log(    'Received message from BACKEND_MAINEXCHANGE - listening to the queue Facility_DeviceReadings : ' +
            ' was registered locally on ' +
            schemavalid.DateTimeRegistered,
          MessagingService.name,
          'rpcHandlerDeviceReadings',
        );
        if (schemavalid) {
          try {
              let finalschema = new CreateReadingDevicesDto();
              if (element.Slot > 0) {
        const result = this.readingTransService
                  .findOneByPlcId(element.deviceId, element.Slot)
                  .then((message) => {
                    if (!message) {
                      throw new NotFoundException('Message with ID not found');
                    } else {
                      finalschema.deviceId = message.deviceId;
                      finalschema.Slot = message.Slot2;
                      if (this.isNumeric(element.ReadingValue)) {
                        finalschema.ReadingValue = element.ReadingValue;
                        finalschema.DateTimeRegistered =
                          schemavalid.DateTimeRegistered;
                        this.readingDeviceService
                          .create(finalschema)
                          .then((message) => {
                            this.mylogger.debug(
                              'Saved',
                              MessagingService.name,
                              'rpcHandlerDeviceReadings',
                            );
                            return 42;
                          });
                      } else {
                        this.mylogger.error(
                          'error',
                          MessagingService.name,
                          'rpcHandlerDeviceReadings',
                        );
                      }
                      return message;
                    }
                  });

The problem seems that this RPC keeps going against rabbit and reading/consuming messages (8 per millisecond) before SQL as a chance to replay back, forcing sequelize into a state that it can't handle anymore and thus throwing the above error. I have tried tweaking the sequelize config but to no good outcome. Is there any way to force the RPC to just handle the next message after the previous one is processed? Would love if someone could steer me in the right direction since this could eventually become a breaking issue.

Thanks in advance for any input you can give me.

Upvotes: 1

Views: 1503

Answers (1)

O. Jones
O. Jones

Reputation: 108841

It looks to me like your Sequelize connection pool options need some tweaking.

You have

   pool: { max: 5, min: 0, adquire: 1800000, idle: 5000 }

adquire isn't a thing. Maybe acquire? Half an hour (1.8 million milliseconds) is a really long time to wait for a connection. Shorten it? acquire: 300000 will give you five minutes. A big production app such as yours probably should always keep one or two connections open. Increase min to 1 or 2.

A modest maximum number of connections is good as long as each operation grabs a connection from the pool, uses it, and releases it. If your operation grabs a connection and then awaits something external, you'll need more connections.

If it's possible to get your program to read in a whole bunch of messages (at least 10) at a time, then put them into your database in one go with bulkCreate(), you'll speed things up. A lot. That's because inserts are cheap, but the commit operations after those inserts aren't so cheap. So, doing multiple inserts within single transactions, then commiting them all at once, can make things dramatically faster. Read about autocommit for more information on this.

Writing your service to chow down on a big message backlog quickly will make errors like the one you showed us less likely.

Edit To use .bulkCreate() you need to accumulate multiple incoming messages. Try something like this.

  1. Create an array of your received CreateReadingDevicesDto messages. let incomingMessages = []

  2. Instead of using .create() to put each new message into your database as you finish receiving and validating it, instead put it into your array. incomingMessages.push(finalschema).

  3. Set up a Javascript interval to take the data from the array and put it into your database with .bulkCreate(). This will do that every 500ms.

    setInterval( 
      function (this) {
        if (incomingMessages.length > 0) {
          /* create all the items in the array */
          this.readingDeviceService
            .bulkCreate(incomingMessages)
          /* empty out the array */
          incomingMessages = []
      }, 500, this);
    

    At the cost of somewhere between 0 and 500ms extra latency, this batches up your messages and will let you process your backlog faster.

I haven't debugged this, and it's probably a little more crude than you want in production code. But I have used similar techniques to good effect.

Upvotes: 2

Related Questions