Nils Mehlhorn
Nils Mehlhorn

Reputation: 390

What's a valid @MessagePattern for NestJS MQTT microservice?

I'm trying to setup a MQTT Microservice using NestJS according to the docs.

I've started a working Mosquitto Broker using Docker and verified it's operability using various MQTT clients. Now, when I start the NestJS service it seems to be connecting correctly (mqqt.fx shows new client), yet I am unable to receive any messages in my controllers. This is my bootstrapping, just like in the docs:

main.ts

async function bootstrap() {
    const app = await NestFactory.createMicroservice(AppModule, {
        transport: Transport.MQTT,
        options: {
            host: 'localhost',
            port: 1883,
            protocol: 'tcp'
        }
    });
    app.listen(() => console.log('Microservice is listening'));
}
bootstrap();

app.controller.ts

@Controller()
export class AppController {

    @MessagePattern('mytopic') // tried {cmd:'mytopic'} or {topic:'mytopic'}
    root(msg: Buffer) {
        console.log('received: ', msg)
    }
}

Am I using the message-pattern decorator wrongly or is my concept wrong of what a NestJS MQTT microservice even is supposed to do? I thought it might subscribe to the topic I pass to the decorator. My only other source of information being the corresponding unit tests

Upvotes: 6

Views: 19560

Answers (4)

Deskaria
Deskaria

Reputation: 11

@Tanas is right. Nestjs/Microservice now listens to your $[topic] and answer to $[topic]/reply. The postfix _ack and _res are deprecated.

For example:

  @MessagePattern('helloWorld')
  getHello(): string {
    console.log("hello world")
    return this.appService.getHello();
  }

Listens now on Topic: helloWorld
Replies now on Topic helloWorld/reply

Regarding ID

You should also provide an id within the payload (See @Hakier) and Nestjs will reply with an answer, containing your id. If you don't have any id, there still won't be any reply but the corresponding logic will still trigger.

For example (Using the snipped from above):
your msg:

{"data":"foo","id":"bar"}

Nestjs reply:

{"response":"Hello World!","isDisposed":true,"id":"bar"}

Without ID:

your message:

{"data":"foo"} or {}

No reply but Hello World in Terminal

Upvotes: 1

Hakier
Hakier

Reputation: 505

I was fighting with MQTT today and this helped me a little, but I had more problems and below you can see my findings:

Wrong way of configuration broker URL

In my case when I used non-local MQTT server I started with this:

  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.MQTT,
    options: {
      host: 'test.mosquitto.org',
      port: 1883,
      protocol: 'tcp',
    },
  });
  await app.listenAsync();

but like you can read in a constructor of ServerMqtt they use url option only (when not provided it fallbacks to 'mqtt://localhost:1883'. While I do not have local MQTT it will never resolve app.listenAsync() which is resolved only on connect and will also not run any handler.

It started to work when I adjusted code to use url option.

  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.MQTT,
    options: {
      url: 'mqtt://test.mosquitto.org:1883',
    },
  });
  await app.listenAsync();

Messages require id property

Second very weird problem was that when I used Non-nest.js Client script from @KimKern I had to register two MessagePatterns: sum and sum_ack:

  @MessagePattern('sum')
  sum(data: number[]): number {
    return data.reduce((a, b) => a + b, 0);
  }

  @MessagePattern('sum_ack')
  sumAck(data: number[]): number {
    return data.reduce((a, b) => a + b, 0);
  }

When I used console.log I discovered that the latter is being run but only when the first one is present. You can push the same message to the broker using mqtt cli tool to check it:

mqtt pub -t 'sum_ack' -h 'test.mosquitto.org' -m '{"data":[1,2]}'

But the biggest problem was that it didn't reply (publish sum_res).

The solution was to provide also id while sending a message.

mqtt pub -t 'sum_ack' -h 'test.mosquitto.org' -m '{"data":[1,2], "id":"any-id"}'

Then we could remove 'sum_ack' MessagePattern and leave only this code:

  @MessagePattern('sum')
  sum(data: number[]): number {
    return data.reduce((a, b) => a + b, 0);
  }

The reason for this was hidden inside handleMessage method of ServerMqtt which will not publish response from a handler if a message didn't have id.

TL/DR Specify url to message broker using url option only and always provide id for a message.

I hope that will save some time to others.

Happy hacking!

Upvotes: 0

Kim Kern
Kim Kern

Reputation: 60547

nest.js Pattern Handler

On nest.js side we have the following pattern handler:

@MessagePattern('sum')
sum(data: number[]): number {
  return data.reduce((a, b) => a + b, 0);
}

As @Alexandre explained, this will actually listen to sum_ack.


Non-nest.js Client

A non-nest.js client could look like this (just save as client.js, run npm install mqtt and run the program with node client.js):

var mqtt = require('mqtt')
var client  = mqtt.connect('mqtt://localhost:1883')

client.on('connect', function () {
  client.subscribe('sum_res', function (err) {
    if (!err) {
      client.publish('sum_ack', '{"data": [2, 3]}');
    }
  })
})

client.on('message', function (topic, message) {
  console.log(message.toString())
  client.end()
})

It sends a message on the topic sum_ack and listens to messages on sum_res. When it receives a message on sum_res, it logs the message and ends the program. nest.js expects the message format to be {data: myData} and then call the param handler sum(myData).

// Log:
{"err":null,"response":5} // This is the response from sum()
{"isDisposed":true} // Internal "complete event" (according to unit test)

Of course, this is not very convenient...


nest.js Client

That is because this is meant to be used with another nest.js client rather than a normal mqtt client. The nest.js client abstracts all the internal logic away. See this answer, which describes the client for redis (only two lines need to be changed for mqtt).

async onModuleInit() {
  await this.client.connect();
  // no 'sum_ack' or {data: [0, 2, 3]} needed
  this.client.send('sum', [0, 2, 3]).toPromise();
}

Upvotes: 7

Alexandre
Alexandre

Reputation: 3170

The documentation is not very clear, but it seem that for mqtt if you have @MessagePattern('mytopic') you can publish a command on the topic mytopic_ack and you will get response on mytopic_res. I am still trying to find out how to publish to the mqtt broker from a service.

See https://github.com/nestjs/nest/blob/e019afa472c432ffe9e7330dc786539221652412/packages/microservices/server/server-mqtt.ts#L99

  public getAckQueueName(pattern: string): string {
    return `${pattern}_ack`;
  }

  public getResQueueName(pattern: string): string {
    return `${pattern}_res`;
  }

Upvotes: 4

Related Questions