Sciid
Sciid

Reputation: 39

Node.js: EventEmitter with mqtt publishing in event handlers

I am struggling in implementing the "event bus" concept in a node.js script (will be run as a service, but right now it is not working even in the debugger).

The idea is to have file called EventBus.js which will have the event handlers and can also emit events for other scripts using the bus. Events can optionally lead to publishing a value on a MQTT broker.

This system works well when the event bus is used in routes served by Express, but does not when run from a standalone script.

Events are properly propagated, I reached the publish normally, client is connected, but nothing is published on the MQTT broker, when the client.publish() is in a event handler. I reach the publish() callback without error. Breakpoints on mqtt close and error are not reached.

I built a very simplified script to show my problem, testEvent.js, below:

const emitter = require("./config/EventBus.js");

emitter.emit("lcl_main_start");

const dao = new AppDAO(constants.dbFile);
const siteDao = new SiteDAO(dao);

emitter.on("eventBus_init", () => {
    try {
      let tick = new dayjs();
      let localSite = siteDao.getActive().then(localSite => {
        emitter.emit("setSysDetails", "buildUpTime", (dayjs() - tick)); <-- triggered normally
        emitter.emit("close mqtt"); <-- triggered normally
        console.log(true);
        process.exit(1);
      }).catch(err => console.log(err));
    } catch (err) {
        emitter.emit("close mqtt");
        console.log(err);
        process.exit(1);
    }

Here is the eventBus file (simplified):

const mqtt = require('mqtt');

const EventEmitter = require('events');
const emitter = new EventEmitter();

emitter.on('uncaughtException', function (err) {
    logger.error("uncaughtException : " + err.message)
});

let mqttOptions = {
    clientId: constants_sys.LCL_MQTT.CLIENT_ID_DEBUG,
    username: constants_sys.LCL_MQTT.USERNAME_MAIN, //TEMP
    password: constants_sys.LCL_MQTT.PASSWORD_MAIN,
    keepAlive: 1000,
    reconnectPeriod: '1000',
    clean: true,
    ca: fs.readFileSync(constants_sys.LCL_MQTT.CA_PATH),
    rejectUnauthorized: false, 
};
  let client = mqtt.connect(constants_sys.LCL_MQTT.HOST, mqttOptions);
  client.on("connect", () => {  
    client.publish("STATUS/eventBus/connected", JSON.stringify(1), {qos: 0, retain: true}, (err) => {
        if (err) logger.error("MQTT publish error [STATUS/eventBus/connected] : " + err.message);
        else emitter.emit("eventBus_init");   <-- this first publish works
    });
});
client.on('error',(err) => {
    client.end();  <-- Not reached
    logger.error('MQTT client init error: ' + err.message);
});

client.on('close', () => {
    logger.info('MQTT connection closed');
});
// Here I have some publish which works well, outside of an event handler (data published at startup)

//publish in event handlers works only if the emitter is in a route served by express

emitter.on("setSysDetails", function (param, val) {
    let ts = new dayjs().format("YYYY-MM-DDTHH:mm:ssZ[Z]");
    if (client.connected) client.publish("STATUS/systemDetails/" + param, JSON.stringify({val: val, timestamp: ts}), {qos: 0, retain: true}, (err) => {
        if (err) {   <-- reach here without errors when setSysDetails is emitted
            reject(err);
        }
    });
});
emitter.on('close mqtt', () => {
    client.publish("STATUS/systemDetails/connected", JSON.stringify(0), {qos: 0, retain: true}, (err) => {
        if (err) logger.error("MQTT publish error [STATUS/eventBus/connected] : " + err.message);
        client.end();
    });
    logger.info('MQTT connection closed.');
});

module.exports = emitter;

Any idea what I am doing wrong?

Thanks

UPDATE

This example test script might not be the best as the 2 emit (setSysDetails and close mqtt) are emitted at the same time (not the case in the actual script), I will update with a better example

Upvotes: 0

Views: 40

Answers (1)

Sciid
Sciid

Reputation: 39

After a day on it, I just noticed this was only happening when debugging, when in production there is no problem anymore.

I suspect that due to some bugs later in the code, the script was exiting before publishing on the MQTT, although I am curious as to why a breakpoint did not make the eventBus publish?

A good tip in such a situation is to place a good old setTimeout for 2 second to check if it is the cause.

Hope that helps if other people have the same issue.

Upvotes: 0

Related Questions