Juan Chaves
Juan Chaves

Reputation: 81

Nest.js Websocket Gateway loosing socket connecition using redid broker

I have to implement websocket communication in my nest.js app. I've successfully setup the websocket gateway and I have tested it with postman. My code looks like this

export class SocketIOAdapter extends IoAdapter {
  constructor(private app: INestApplicationContext, private configService: ConfigService) {
    super(app);
  }

  createIOServer(port: number, options: ServerOptions) {
    const clientPort = parseInt(this.configService.getOrThrow("PORT"));

    const cors = {
      origin: [
        `http://localhost:${clientPort}`,
        new RegExp(`/^http:\/\/192\.168\.1\.([1-9]|[1-9]\d):${clientPort}$/`),
      ],
    };

    const optionsWithCORS: ServerOptions = {
      ...options,
      cors,
    };

    const server: Server = super.createIOServer(port, optionsWithCORS);

    const orderRepository = this.app.get(OrderRepository);
    server
      .of("/orders")
      .use(createTokenMiddleware(orderRepository));

    return server;
  }
}

const createTokenMiddleware =
  (orderRepository: OrderRepository) =>
  async (socket: Socket, next) => {
    // here I run some logic using my order repository
      next();
    } catch {
      next(new Error("FORBIDDEN"));
    }
  };

And

@WebSocketGateway({
  namespace: "/orders",
})
@Injectable()
export class OrderGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
  private readonly logger = new Logger(OrderGateway.name);

  @WebSocketServer() io: Namespace;

  afterInit(): void {
    this.logger.log("Websocket Gateway initialized.");
  }

  async handleConnection(client: Socket) {
    const sockets = this.io.sockets;

    // here I run some logic to know which rooms to use for this client
    const roomsToJoin = [...]
    await client.join(roomsToJoin);
  }

  async handleDisconnect(client: Socket) {
    this.logger.log(`Disconnected socket id: ${client.id}`);
  }

  public emitOrderStatusChangeNotification(order: OrderDTO) {
    this.io
      .to("Here I put some roomId that depends on order")
      .emit("order_status_changed", JSON.stringify(order));
  }
}

Now, whenever I want to send a notification, I inject the OrderGateway and call emitOrderStatusChangeNotification. This works fine, however, my app is deployed on several instances behind a load balancer. The latter breaks this approach as socket clients may be connected to a different server from the one I'm sending the notification. So, the next step to scale web sockets (as far as I understand) is to use a broker. I tried to use Redis pub/sub in the following way. I have this two classes:

@Injectable()
export class NotificationPublisherService {
  constructor(@Inject("ORDER_NOTIFICATION_SERVICE") private client: ClientProxy) {}

  async publishEvent(order: OrderDTO) {
    console.log("will emit to redis");
    this.client.emit(Constants.notificationEventName, order);
  }
}
@Controller()
export class NotificationSuscriberController {
  private readonly logger = new Logger(NotificationSuscriberController.name);

  constructor(private readonly orderGateway: OrderGateway) {}

  @EventPattern(Constants.notificationEventName)
  async handleOrderStatusChangeEvent(order: OrderDTO) {
    try {
      this.orderGateway.emitOrderStatusChangeNotification(order);
    } catch (err) {
      this.logger.log("error sending notification");
    }
  }

As you can see, I'm injecting orderGateway in the class that have the method that handles the data from redis and in that handler I send the notification. Finally, I replaced all the invocations of emitOrderStatusChangeNotification to the publishEvent method of NotificationPublisherService. After doing this, the flow works well except from the last step. This means, the data is put on redis and read by the suscriber, which tries to send the websocket notification. However, when logging the connected clients for that room in emitOrderStatusChangeNotification method, I'm getting that there are no connected clients, even though I confirmed there where connected clients on that room (I did this by logging the list of connected clients after doing client.join in the handleConnection method of OrderGateway). My best guess is that an instance of OrderGateway handles the socket connection and a different instance of OrderGateway is processing the data from Redis broker. I tried to explicitly set the scope of the Gateway to Default to guarantee that my app has only one instance of OrderGateway (I also confirmed that it has not any request scoped dependency that could bubble up and make it not default scoped). It did not work and I'm out of ideas. Does anyone know what could be happening? Thanks in advance

EDIT

As Gregorio suggested in the answers, I had to extend my adapter as explained in the docs, the following code worked for me

export class SocketIOAdapter extends IoAdapter {
  private adapterConstructor: ReturnType<typeof createAdapter>;
  constructor(private app: INestApplicationContext, private configService: ConfigService) {
    super(app);
  }

  async connectToRedis(): Promise<void> {
    const pubClient = createClient({ url: "redis://localhost:6379" });
    const subClient = pubClient.duplicate();

    await Promise.all([pubClient.connect(), subClient.connect()]);

    this.adapterConstructor = createAdapter(pubClient, subClient);
  }

  createIOServer(port: number, options: ServerOptions) {
    const clientPort = parseInt(this.configService.getOrThrow("PORT"));

    const cors = {
      origin: [
        `http://localhost:${clientPort}`,
        new RegExp(`/^http:\/\/192\.168\.1\.([1-9]|[1-9]\d):${clientPort}$/`),
      ],
    };

    const optionsWithCORS: ServerOptions = {
      ...options,
      cors,
    };

    const server: Server = super.createIOServer(port, optionsWithCORS);

    const orderRepository = this.app.get(OrderRepository);
    server
      .adapter(this.adapterConstructor)
      .of(`/orders`)
      .use(createTokenMiddleware(orderRepository));

    return server;
  }
}

const createTokenMiddleware =
  (orderRepository: OrderRepository) =>
  async (socket: Socket, next) => {
    // here I run some logic using my order repository
      next();
    } catch {
      next(new Error("FORBIDDEN"));
    }
  };
}

and in my main.ts

const redisIoAdapter = new SocketIOAdapter(app, configService);
await redisIoAdapter.connectToRedis();

Upvotes: 2

Views: 1112

Answers (1)

gregorioospina
gregorioospina

Reputation: 33

Have you tried following this page from the nest.js docs? I think it might help you in what you're looking for. You should write in your SocketIOAdapter what it says there in order to connect with Redis, it is not necessary to have the NotificationPublisherService or the NPController.

https://docs.nestjs.com/websockets/adapter

Upvotes: 1

Related Questions