Reputation: 81
I have to implement websocket communication in my nest.js app. I've successfully setup the websocket gateway and I have tested it with postman. My code looks like this
export class SocketIOAdapter extends IoAdapter {
constructor(private app: INestApplicationContext, private configService: ConfigService) {
super(app);
}
createIOServer(port: number, options: ServerOptions) {
const clientPort = parseInt(this.configService.getOrThrow("PORT"));
const cors = {
origin: [
`http://localhost:${clientPort}`,
new RegExp(`/^http:\/\/192\.168\.1\.([1-9]|[1-9]\d):${clientPort}$/`),
],
};
const optionsWithCORS: ServerOptions = {
...options,
cors,
};
const server: Server = super.createIOServer(port, optionsWithCORS);
const orderRepository = this.app.get(OrderRepository);
server
.of("/orders")
.use(createTokenMiddleware(orderRepository));
return server;
}
}
const createTokenMiddleware =
(orderRepository: OrderRepository) =>
async (socket: Socket, next) => {
// here I run some logic using my order repository
next();
} catch {
next(new Error("FORBIDDEN"));
}
};
And
@WebSocketGateway({
namespace: "/orders",
})
@Injectable()
export class OrderGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
private readonly logger = new Logger(OrderGateway.name);
@WebSocketServer() io: Namespace;
afterInit(): void {
this.logger.log("Websocket Gateway initialized.");
}
async handleConnection(client: Socket) {
const sockets = this.io.sockets;
// here I run some logic to know which rooms to use for this client
const roomsToJoin = [...]
await client.join(roomsToJoin);
}
async handleDisconnect(client: Socket) {
this.logger.log(`Disconnected socket id: ${client.id}`);
}
public emitOrderStatusChangeNotification(order: OrderDTO) {
this.io
.to("Here I put some roomId that depends on order")
.emit("order_status_changed", JSON.stringify(order));
}
}
Now, whenever I want to send a notification, I inject the OrderGateway and call emitOrderStatusChangeNotification
. This works fine, however, my app is deployed on several instances behind a load balancer. The latter breaks this approach as socket clients may be connected to a different server from the one I'm sending the notification. So, the next step to scale web sockets (as far as I understand) is to use a broker. I tried to use Redis pub/sub in the following way. I have this two classes:
@Injectable()
export class NotificationPublisherService {
constructor(@Inject("ORDER_NOTIFICATION_SERVICE") private client: ClientProxy) {}
async publishEvent(order: OrderDTO) {
console.log("will emit to redis");
this.client.emit(Constants.notificationEventName, order);
}
}
@Controller()
export class NotificationSuscriberController {
private readonly logger = new Logger(NotificationSuscriberController.name);
constructor(private readonly orderGateway: OrderGateway) {}
@EventPattern(Constants.notificationEventName)
async handleOrderStatusChangeEvent(order: OrderDTO) {
try {
this.orderGateway.emitOrderStatusChangeNotification(order);
} catch (err) {
this.logger.log("error sending notification");
}
}
As you can see, I'm injecting orderGateway
in the class that have the method that handles the data from redis and in that handler I send the notification. Finally, I replaced all the invocations of emitOrderStatusChangeNotification
to the publishEvent
method of NotificationPublisherService
. After doing this, the flow works well except from the last step. This means, the data is put on redis and read by the suscriber, which tries to send the websocket notification. However, when logging the connected clients for that room in emitOrderStatusChangeNotification
method, I'm getting that there are no connected clients, even though I confirmed there where connected clients on that room (I did this by logging the list of connected clients after doing client.join
in the handleConnection
method of OrderGateway
). My best guess is that an instance of OrderGateway
handles the socket connection and a different instance of OrderGateway
is processing the data from Redis broker. I tried to explicitly set the scope of the Gateway to Default
to guarantee that my app has only one instance of OrderGateway
(I also confirmed that it has not any request scoped dependency that could bubble up and make it not default scoped). It did not work and I'm out of ideas. Does anyone know what could be happening? Thanks in advance
EDIT
As Gregorio suggested in the answers, I had to extend my adapter as explained in the docs, the following code worked for me
export class SocketIOAdapter extends IoAdapter {
private adapterConstructor: ReturnType<typeof createAdapter>;
constructor(private app: INestApplicationContext, private configService: ConfigService) {
super(app);
}
async connectToRedis(): Promise<void> {
const pubClient = createClient({ url: "redis://localhost:6379" });
const subClient = pubClient.duplicate();
await Promise.all([pubClient.connect(), subClient.connect()]);
this.adapterConstructor = createAdapter(pubClient, subClient);
}
createIOServer(port: number, options: ServerOptions) {
const clientPort = parseInt(this.configService.getOrThrow("PORT"));
const cors = {
origin: [
`http://localhost:${clientPort}`,
new RegExp(`/^http:\/\/192\.168\.1\.([1-9]|[1-9]\d):${clientPort}$/`),
],
};
const optionsWithCORS: ServerOptions = {
...options,
cors,
};
const server: Server = super.createIOServer(port, optionsWithCORS);
const orderRepository = this.app.get(OrderRepository);
server
.adapter(this.adapterConstructor)
.of(`/orders`)
.use(createTokenMiddleware(orderRepository));
return server;
}
}
const createTokenMiddleware =
(orderRepository: OrderRepository) =>
async (socket: Socket, next) => {
// here I run some logic using my order repository
next();
} catch {
next(new Error("FORBIDDEN"));
}
};
}
and in my main.ts
const redisIoAdapter = new SocketIOAdapter(app, configService);
await redisIoAdapter.connectToRedis();
Upvotes: 2
Views: 1112
Reputation: 33
Have you tried following this page from the nest.js docs? I think it might help you in what you're looking for. You should write in your SocketIOAdapter what it says there in order to connect with Redis, it is not necessary to have the NotificationPublisherService or the NPController.
https://docs.nestjs.com/websockets/adapter
Upvotes: 1