Kyle Corbin Hurst
Kyle Corbin Hurst

Reputation: 1121

How do I join a socket io room AFTER a redis subscription event happens?

I'm using redis pub/sub and I want to join a room after the redis sub event:

const express = require('express');
const app = express();
const socketio = require('socket.io');
const redis = require("redis");

const expressServer = app.listen(3001, () => console.log("Express is running!"));

const io = socketio(expressServer);

const sub = redis.createClient({port: 6379, host: '127.0.0.1'});

sub.subscribe('message');


let room;

sub.on('error', function (error) {
    console.log('ERROR ' + error);
});

sub.on('connect', function(){
    console.log('Redis client connected');
});

//Redis sub scribe to message conversation channel
sub.on('message', async function (channel, data) {
    data = JSON.parse(data);
    if(channel === 'conversation'){
        room = data.id;
        console.log('room set ' + room)
    }
});

io.on('connect', (socket) => {
    socket.emit('test', 'from server')

    //Room I'd like to join
    socket.join(room);

    io.of("/").adapter.on("join-room", (room, id) => {
        console.log(`socket ${id} has joined room ${room}`);
    });
});

Basically what I want to happen is that on sub.on('message') I set a room state and after I want to socket.join(room);. What's happening right now is the socket server is connecting faster than the redis sub event. On the first sub event the socket server joins an undefined room, and on the next it joins the last room set by sub.on(message)

Is there a way that I could use the node event emitter so a socket room is joined after the room is set by redis? Or how would I use the socket.io.reids.adapter to do this?

It looks like there's a way to do this with new redis socket.io adapter functions but there's no docs on how to do it? Function: RedisAdapter.remoteJoin(id, room)

Edit

I started trying to do this with remoteJoin with the socket adapter. The problem is the socket.id

const express = require('express');
const app = express();
const socketio = require('socket.io');
const redis = require("redis");
const redisAdapter = require('@socket.io/redis-adapter');

const expressServer = app.listen(3001, () => console.log("Express is running!"));

const io = socketio(expressServer);

const sub = redis.createClient({port: 6379, host: '127.0.0.1'});
const pub = sub.duplicate();

io.adapter(redisAdapter(pub, sub));

sub.subscribe('conversation','message', function(){
    // console.log('Subbed')
})

//sub.subscribe('conversation','message');

let room;
let soketId;

sub.on('error', function (error) {
    console.log('ERROR ' + error);
});

sub.on('connect', function(){
    console.log('Redis client connected');
});


sub.on('message', async function (channel, data) {
     data = JSON.parse(data);
     if(channel === 'conversation'){
        room = data.id;

        io.of('/').adapter.remoteJoin(socketId, room);

        console.log('subbed ' + room)
    }
});

io.on('connect', (socket) => {
    socketId = socket.id;

    io.of("/").adapter.on("join-room", (room, id) => {
        console.log(`socket ${id} has joined room ${room}`);
    });
});

The problem is socketId = socket.id; is set to the last client that connected. So all clients become the last client that connected.

Upvotes: 0

Views: 1301

Answers (1)

Shahriar Shojib
Shahriar Shojib

Reputation: 927

I have refactored your code to react to incoming messages from Redis pub/sub

This code can be reduced with the new Redis client or ioredis client which has a promise-based API.

Whenever a new client is connected, you need to subscribe for messages on Redis so that you can react to new messages while the client is connected.

The problem is when the socket is disconnected, you no longer need to receive those messages, so I created a helper function which does the following:

  • Connects to redis
  • Creates a subscription
  • When new messages arrive, it calls the callback function
  • Returns a function when called unsubscribes from messages and disconnects the current Redis client.

const express = require('express');
const app = express();
const socketio = require('socket.io');
const redis = require('redis');

const expressServer = app.listen(3001, () => console.log('Express is running!'));

const io = socketio(expressServer);

const createSubscriber = async cb => {
    const client = redis.createClient({ port: 6379, host: '127.0.0.1' });

    await new Promise((resolve, reject) => {
        client.on('connect', resolve);
        client.on('error', reason => {
            client.end();
            reject(reason);
        });
    });

    await new Promise((resolve, reject) => {
        client.subscribe('message', err => {
            if (err) {
                client.end();
                reject(err);
            } else {
                resolve();
            }
        });
    });

    client.on('message', (channel, message) => {
        if (channel === 'conversation') {
            cb(JSON.parse(message));
        }
    });

    return async () => {
        await new Promise(resolve => {
            client.unsubscribe('message', err => {
                if (err) {
                    console.error(err);
                }
                client.end();
                resolve();
            });
        });
    };
};

io.on('connect', async socket => {
    socket.emit('test', 'from server');

    try {
        const onDisconnected = await createSubscriber(data => {
            socket.join(data.id);
        });

        socket.on('disconnect', async () => {
            await onDisconnected();
        });
    } catch (error) {
        console.error(error); // redis has failed to connect
        socket.emit('error', error.message);
        socket.disconnect(); // disconnect the socket since there is nothing to do
    }
});

io.of('/').adapter.on('join-room', (room, id) => {
    console.log(`socket ${id} has joined room ${room}`);
});



Upvotes: 0

Related Questions