Neil Stevens
Neil Stevens

Reputation: 3904

NodeJS + WS access currently running WS server instance

I have implemented a simple REST API using NodeJS, ExpressJS and routing-controllers. I have also implemented a basic WebSocket server running alongside the REST API and using WS.

const app = express();

app.use(bodyParser.json({limit: "50mb"}));
app.use(bodyParser.urlencoded({limit: "50mb", extended: true}));

useExpressServer(app, {
    controllers: [
        UserController
    ]
});

const server = app.listen(21443, (err: Error) => {
    console.log("listening on port 21443");
});

const wss = new WebSocket.Server({server});

wss.on("connection", (ws: WebSocket) => {
    ws.on("message", (message: string) => {
        console.log("received: %s", message);
        ws.send(`Hello, you sent -> ${message}`);
    });

    ws.send("Hi there, I am a WebSocket server");
});

My question is how to I get access to the currently running WS instance so that I am able to send or broadcast from my controller methods. I have a number of POST methods that run long processes and so return a HTTP 200 to the client, I then would like to either send or broadcast to all connected WS clients.

What is the correct way to access the WebSocket.Server instance from within my controller classes?

Upvotes: 2

Views: 962

Answers (2)

Dashiell Rose Bark-Huss
Dashiell Rose Bark-Huss

Reputation: 2965

You can create the websocket earlier and pass the instance around:

const notifier = new NotifierService(); 
notifier.connect(http.createServer(app));

app.get("/somethingHappened", () => {
  notifier.broadcast("new notification!!");
});

app.use(routes(notifier))

Full code:

app.js

Pass the websocket to the other routes:

const express = require("express");
const http = require("http");
const NotifierService = require("../server/NotifierService.js");
const routes = require("./routes");

const app = express();
const server = http.createServer(app);
const notifier = new NotifierService();
notifier.connect(server);

app.get("/somethingHappened", () => {
  notifier.broadcast("new notification!!");
});

//   to demonstrate how the notifier instance can be
//   passed around to different routes
app.use(routes(notifier));

server
  .listen(4000)
  .on("listening", () =>
    console.log("info", `HTTP server listening on port 4000`)
  );

NotifierService.js class that handles the websocket

const url = require("url");
const { Server } = require("ws");

class NotifierService {
  constructor() {
    this.connections = new Map();
  }

  connect(server) {
    this.server = new Server({ noServer: true });
    this.interval = setInterval(this.checkAll.bind(this), 10000);
    this.server.on("close", this.close.bind(this));
    this.server.on("connection", this.add.bind(this));
    server.on("upgrade", (request, socket, head) => {
      console.log("ws upgrade");
      const id = url.parse(request.url, true).query.storeId;

      if (id) {
        this.server.handleUpgrade(request, socket, head, (ws) =>
          this.server.emit("connection", id, ws)
        );
      } else {
        socket.destroy();
      }
    });
  }

  add(id, socket) {
    console.log("ws add");
    socket.isAlive = true;
    socket.on("pong", () => (socket.isAlive = true));
    socket.on("close", this.remove.bind(this, id));
    this.connections.set(id, socket);
  }

  send(id, message) {
    console.log("ws sending message");

    const connection = this.connections.get(id);

    connection.send(JSON.stringify(message));
  }

  broadcast(message) {
    console.log("ws broadcast");
    this.connections.forEach((connection) =>
      connection.send(JSON.stringify(message))
    );
  }

  isAlive(id) {
    return !!this.connections.get(id);
  }

  checkAll() {
    this.connections.forEach((connection) => {
      if (!connection.isAlive) {
        return connection.terminate();
      }

      connection.isAlive = false;
      connection.ping("");
    });
  }

  remove(id) {
    this.connections.delete(id);
  }

  close() {
    clearInterval(this.interval);
  }
}

module.exports = NotifierService;

routes.js

const express = require("express");

const router = express.Router();
module.exports = (webSocketNotifier) => {
  router.post("/newPurchase/:id", (req, res, next) => {
    webSocketNotifier.send(req.params.id, "purchase made");
    res.status(200).send();
  });
  return router;
};

Upvotes: 1

Valera
Valera

Reputation: 2923

List of connected clients are stored inside wss object. You can receive and loop through them like this:

wss.clients.forEach((client) => {
    if (client.userId === current_user_id && client.readyState === WebSocket.OPEN) {
        // this is the socket of your current user
    }
})

Now you need to somehow identify your client. You can do it by assigning some id to this client on connection:

wss.on('connection', async (ws, req) => {
    // req.url is the url that user connected with
    // use a query parameter on connection, or an authorization token by which you can identify the user
    // so your connection url will look like
    // http://example.com/socket?token=your_token
    ws.userId = your_user_identifier
    ....
})

To broadcast use:

wss.clients.forEach((client) => {
    if (client.readyState === WebSocket.OPEN) {
        client.send(data);
    }
});

If your controller and socket will be in different files (and I am sure they will), you will have to export the wss object in your socket file and import it in controller.

Upvotes: 0

Related Questions