Reputation: 869
I'm creating microservice by nestjs, transfer throw rabbitmq. How to make microservice receive messages from queue in turn waiting for complete of the previous one.
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.RMQ,
options: {
urls: [`amqp://localhost:5672`],
queue: 'rmq_queue',
queueOptions: { durable: false },
prefetchCount: 1,
},
});
await app.listenAsync();
}
bootstrap();
import { Controller, Logger } from '@nestjs/common';
import { EventPattern } from '@nestjs/microservices';
@Controller()
export class AppController {
@EventPattern('hello')
async handleHello(): Promise<void> {
Logger.log('-handle-');
await (new Promise(resolve => setTimeout(resolve, 5000)));
Logger.log('---hello---');
}
}
const { ClientRMQ } = require('@nestjs/microservices');
(async () => {
const client = new ClientRMQ({
urls: ['amqp://localhost:5672'],
queue: 'rmq_queue',
queueOptions: { durable: false },
});
await client.connect();
for (let i = 0; i < 3; i++) {
client.emit('hello', 0).subscribe();
}
})();
https://github.com/heySasha/nest-rmq
Actual output:
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +2ms
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +9ms
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +12ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +4967ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +2ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +1ms
But i expect:
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +2ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +5067ms
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +2ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +5067ms
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +2ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +5067ms
Upvotes: 5
Views: 6626
Reputation: 21
You should add noAck: false
to main.ts.
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.RMQ,
options: {
urls: [`amqp://localhost:5672`],
queue: 'rmq_queue',
queueOptions: { durable: false },
noAck: false,
prefetchCount: 1,
},
});
await app.listenAsync();
}
bootstrap();
Additionally, you have to add context
to the controller and make the ack
nowledgment.
import { Controller, Logger } from '@nestjs/common';
import { Ctx, EventPattern, RmqContext } from '@nestjs/microservices';
@Controller()
export class AppController {
@EventPattern('hello')
async handleHello(@Ctx() context: RmqContext): Promise<void> {
Logger.log('-handle-');
await (new Promise(resolve => setTimeout(resolve, 5000)));
Logger.log('---hello---');
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
channel.ack(originalMsg);
}
}
Upvotes: 2
Reputation: 869
I have written custom strategy.
import { isString, isUndefined } from '@nestjs/common/utils/shared.utils';
import { Observable } from 'rxjs';
import { CustomTransportStrategy, RmqOptions, Server } from '@nestjs/microservices';
import {
CONNECT_EVENT, DISCONNECT_EVENT, DISCONNECTED_RMQ_MESSAGE, NO_MESSAGE_HANDLER,
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
RQM_DEFAULT_PREFETCH_COUNT,
RQM_DEFAULT_QUEUE, RQM_DEFAULT_QUEUE_OPTIONS,
RQM_DEFAULT_URL,
} from '@nestjs/microservices/constants';
let rqmPackage: any = {};
export class ServerRMQ extends Server implements CustomTransportStrategy {
private server: any = null;
private channel: any = null;
private readonly urls: string[];
private readonly queue: string;
private readonly prefetchCount: number;
private readonly queueOptions: any;
private readonly isGlobalPrefetchCount: boolean;
constructor(private readonly options: RmqOptions['options']) {
super();
this.urls = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL];
this.queue =
this.getOptionsProp(this.options, 'queue') || RQM_DEFAULT_QUEUE;
this.prefetchCount =
this.getOptionsProp(this.options, 'prefetchCount') ||
RQM_DEFAULT_PREFETCH_COUNT;
this.isGlobalPrefetchCount =
this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
this.queueOptions =
this.getOptionsProp(this.options, 'queueOptions') ||
RQM_DEFAULT_QUEUE_OPTIONS;
this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
rqmPackage = this.loadPackage(
'amqp-connection-manager',
ServerRMQ.name,
() => require('amqp-connection-manager'),
);
}
public async listen(callback: () => void): Promise<void> {
await this.start(callback);
}
public close(): void {
if (this.channel) {
this.channel.close();
}
if (this.server) {
this.server.close();
}
}
public async start(callback?: () => void) {
this.server = this.createClient();
this.server.on(CONNECT_EVENT, (_: any) => {
this.channel = this.server.createChannel({
json: false,
setup: (channel: any) => this.setupChannel(channel, callback),
});
});
this.server.on(DISCONNECT_EVENT, (err: any) => {
this.logger.error(DISCONNECTED_RMQ_MESSAGE);
});
}
public createClient<T = any>(): T {
const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
return rqmPackage.connect(this.urls, socketOptions);
}
public async setupChannel(channel: any, callback: () => void) {
await channel.assertQueue(this.queue, this.queueOptions);
await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
channel.consume(
this.queue,
(msg: any) => this.handleMessage(msg)
.then(() => this.channel.ack(msg)) // Ack message after complete
.catch(err => {
// error handling
this.logger.error(err);
return this.channel.ack(msg);
}),
{ noAck: false },
);
callback();
}
public async handleMessage(message: any): Promise<void> {
const { content, properties } = message;
const packet = JSON.parse(content.toString());
const pattern = isString(packet.pattern)
? packet.pattern
: JSON.stringify(packet.pattern);
if (isUndefined(packet.id)) {
return this.handleEvent(pattern, packet);
}
const handler = this.getHandlerByPattern(pattern);
if (!handler) {
const status = 'error';
return this.sendMessage(
{ status, err: NO_MESSAGE_HANDLER },
properties.replyTo,
properties.correlationId,
);
}
const response$ = this.transformToObservable(
await handler(packet.data),
) as Observable<any>;
const publish = <T>(data: T) =>
this.sendMessage(data, properties.replyTo, properties.correlationId);
if (response$) {
this.send(response$, publish);
}
}
public sendMessage<T = any>(
message: T,
replyTo: any,
correlationId: string,
): void {
const buffer = Buffer.from(JSON.stringify(message));
this.channel.sendToQueue(replyTo, buffer, { correlationId });
}
}
The core thing changed from the standard ServerRMQ
is setupChannel()
part where we now pass noAck: false
and acknowledge manually in the finally part of the this.handleMessage(msg)
using this.channel.ack(msg)
.
Upvotes: 4
Reputation: 2376
What you want to have is usually accomplished with consumer acknowledgments. You can read about them here. In short, your consumer (in your case Nest.js microservice), that has prefetch count set to 1, will receive a new message only after it acknowledges a previous one. If you are familiar with AWS SQS, this operation is similar to deleting message from the queue.
Nest.js uses amqplib under the hood for communicating with RabbitMQ. Consumer acknowledgment policy is established during channel creation - you can see there's a noAck
option. However, the channel is created with noAck
set to true
- you can check it here, which means that it's the listener who automatically acknowledges messages when they are passed to your @EventHandler
method. You can verify that with RabbitMQ management plugin, that provides handy UI and ability to check non acked messages in flight.
I failed to find any useful info about that both in Nest.js sources and docs. But this might give you a hint.
Upvotes: 5