Aleksandr Yatsenko
Aleksandr Yatsenko

Reputation: 869

How to make waiting for the completion of actions, then receive a new message?

I'm creating microservice by nestjs, transfer throw rabbitmq. How to make microservice receive messages from queue in turn waiting for complete of the previous one.

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.RMQ,
    options: {
      urls: [`amqp://localhost:5672`],
      queue: 'rmq_queue',
      queueOptions: { durable: false },
      prefetchCount: 1,
    },
  });

  await app.listenAsync();
}

bootstrap();

import { Controller, Logger } from '@nestjs/common';
import { EventPattern } from '@nestjs/microservices';

@Controller()
export class AppController {
  @EventPattern('hello')
  async handleHello(): Promise<void> {
    Logger.log('-handle-');
    await (new Promise(resolve => setTimeout(resolve, 5000)));
    Logger.log('---hello---');
  }
}
const { ClientRMQ } = require('@nestjs/microservices');

(async () => {
  const client = new ClientRMQ({
    urls: ['amqp://localhost:5672'],
    queue: 'rmq_queue',
    queueOptions: { durable: false },
  });

  await client.connect();

  for (let i = 0; i < 3; i++) {
    client.emit('hello', 0).subscribe();
  }
})();

https://github.com/heySasha/nest-rmq

Actual output:

[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +2ms
[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +9ms
[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +12ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +4967ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +2ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +1ms

But i expect:

[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +2ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +5067ms
[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +2ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +5067ms
[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +2ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +5067ms

Upvotes: 5

Views: 6626

Answers (3)

Ivan Kazanovskiy
Ivan Kazanovskiy

Reputation: 21

You should add noAck: false to main.ts.

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.RMQ,
    options: {
      urls: [`amqp://localhost:5672`],
      queue: 'rmq_queue',
      queueOptions: { durable: false },
      noAck: false,
      prefetchCount: 1,
    },
  });

  await app.listenAsync();
}

bootstrap();

Additionally, you have to add context to the controller and make the acknowledgment.

import { Controller, Logger } from '@nestjs/common';
import {  Ctx, EventPattern, RmqContext } from '@nestjs/microservices';

@Controller()
export class AppController {
  @EventPattern('hello')
  async handleHello(@Ctx() context: RmqContext): Promise<void> {
    Logger.log('-handle-');
    await (new Promise(resolve => setTimeout(resolve, 5000)));
    Logger.log('---hello---');

    const channel = context.getChannelRef();
    const originalMsg = context.getMessage();
    channel.ack(originalMsg);
  }
}

Upvotes: 2

Aleksandr Yatsenko
Aleksandr Yatsenko

Reputation: 869

I have written custom strategy.

import { isString, isUndefined } from '@nestjs/common/utils/shared.utils';
import { Observable } from 'rxjs';
import { CustomTransportStrategy, RmqOptions, Server } from '@nestjs/microservices';
import {
    CONNECT_EVENT, DISCONNECT_EVENT, DISCONNECTED_RMQ_MESSAGE, NO_MESSAGE_HANDLER,
    RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
    RQM_DEFAULT_PREFETCH_COUNT,
    RQM_DEFAULT_QUEUE, RQM_DEFAULT_QUEUE_OPTIONS,
    RQM_DEFAULT_URL,
} from '@nestjs/microservices/constants';

let rqmPackage: any = {};

export class ServerRMQ extends Server implements CustomTransportStrategy {
    private server: any = null;
    private channel: any = null;
    private readonly urls: string[];
    private readonly queue: string;
    private readonly prefetchCount: number;
    private readonly queueOptions: any;
    private readonly isGlobalPrefetchCount: boolean;

    constructor(private readonly options: RmqOptions['options']) {
        super();
        this.urls = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL];
        this.queue =
            this.getOptionsProp(this.options, 'queue') || RQM_DEFAULT_QUEUE;
        this.prefetchCount =
            this.getOptionsProp(this.options, 'prefetchCount') ||
            RQM_DEFAULT_PREFETCH_COUNT;
        this.isGlobalPrefetchCount =
            this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
            RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
        this.queueOptions =
            this.getOptionsProp(this.options, 'queueOptions') ||
            RQM_DEFAULT_QUEUE_OPTIONS;

        this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
        rqmPackage = this.loadPackage(
            'amqp-connection-manager',
            ServerRMQ.name,
            () => require('amqp-connection-manager'),
        );
    }

    public async listen(callback: () => void): Promise<void> {
        await this.start(callback);
    }

    public close(): void {
        if (this.channel) {
            this.channel.close();
        }

        if (this.server) {
            this.server.close();
        }
    }

    public async start(callback?: () => void) {
        this.server = this.createClient();
        this.server.on(CONNECT_EVENT, (_: any) => {
            this.channel = this.server.createChannel({
                json: false,
                setup: (channel: any) => this.setupChannel(channel, callback),
            });
        });
        this.server.on(DISCONNECT_EVENT, (err: any) => {
            this.logger.error(DISCONNECTED_RMQ_MESSAGE);
        });
    }

    public createClient<T = any>(): T {
        const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
        return rqmPackage.connect(this.urls, socketOptions);
    }

    public async setupChannel(channel: any, callback: () => void) {
        await channel.assertQueue(this.queue, this.queueOptions);
        await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
        channel.consume(
            this.queue,
            (msg: any) => this.handleMessage(msg)
                .then(() => this.channel.ack(msg)) // Ack message after complete
                .catch(err => {
                    // error handling
                    this.logger.error(err);
                    return this.channel.ack(msg);
                }),
            { noAck: false },
        );
        callback();
    }

    public async handleMessage(message: any): Promise<void> {
        const { content, properties } = message;
        const packet = JSON.parse(content.toString());
        const pattern = isString(packet.pattern)
            ? packet.pattern
            : JSON.stringify(packet.pattern);

        if (isUndefined(packet.id)) {
            return this.handleEvent(pattern, packet);
        }

        const handler = this.getHandlerByPattern(pattern);

        if (!handler) {
            const status = 'error';

            return this.sendMessage(
                { status, err: NO_MESSAGE_HANDLER },
                properties.replyTo,
                properties.correlationId,
            );
        }

        const response$ = this.transformToObservable(
            await handler(packet.data),
        ) as Observable<any>;

        const publish = <T>(data: T) =>
            this.sendMessage(data, properties.replyTo, properties.correlationId);

        if (response$) {
            this.send(response$, publish);
        }

    }

    public sendMessage<T = any>(
        message: T,
        replyTo: any,
        correlationId: string,
    ): void {
        const buffer = Buffer.from(JSON.stringify(message));
        this.channel.sendToQueue(replyTo, buffer, { correlationId });
    }
}

The core thing changed from the standard ServerRMQ is setupChannel() part where we now pass noAck: false and acknowledge manually in the finally part of the this.handleMessage(msg) using this.channel.ack(msg).

Upvotes: 4

Vladyslav Usenko
Vladyslav Usenko

Reputation: 2376

What you want to have is usually accomplished with consumer acknowledgments. You can read about them here. In short, your consumer (in your case Nest.js microservice), that has prefetch count set to 1, will receive a new message only after it acknowledges a previous one. If you are familiar with AWS SQS, this operation is similar to deleting message from the queue.

Nest.js uses amqplib under the hood for communicating with RabbitMQ. Consumer acknowledgment policy is established during channel creation - you can see there's a noAck option. However, the channel is created with noAck set to true - you can check it here, which means that it's the listener who automatically acknowledges messages when they are passed to your @EventHandler method. You can verify that with RabbitMQ management plugin, that provides handy UI and ability to check non acked messages in flight.

I failed to find any useful info about that both in Nest.js sources and docs. But this might give you a hint.

Upvotes: 5

Related Questions