Reputation: 1121
I'm using redis pub/sub and I want to join a room after the redis sub event:
const express = require('express');
const app = express();
const socketio = require('socket.io');
const redis = require("redis");
const expressServer = app.listen(3001, () => console.log("Express is running!"));
const io = socketio(expressServer);
const sub = redis.createClient({port: 6379, host: '127.0.0.1'});
sub.subscribe('message');
let room;
sub.on('error', function (error) {
console.log('ERROR ' + error);
});
sub.on('connect', function(){
console.log('Redis client connected');
});
//Redis sub scribe to message conversation channel
sub.on('message', async function (channel, data) {
data = JSON.parse(data);
if(channel === 'conversation'){
room = data.id;
console.log('room set ' + room)
}
});
io.on('connect', (socket) => {
socket.emit('test', 'from server')
//Room I'd like to join
socket.join(room);
io.of("/").adapter.on("join-room", (room, id) => {
console.log(`socket ${id} has joined room ${room}`);
});
});
Basically what I want to happen is that on sub.on('message')
I set a room
state and after I want to socket.join(room);
. What's happening right now is the socket server is connecting faster than the redis sub event. On the first sub event the socket server joins an undefined room, and on the next it joins the last room set by sub.on(message)
Is there a way that I could use the node event emitter so a socket room is joined after the room is set by redis? Or how would I use the socket.io.reids.adapter to do this?
It looks like there's a way to do this with new redis socket.io adapter functions but there's no docs on how to do it? Function: RedisAdapter.remoteJoin(id, room)
I started trying to do this with remoteJoin
with the socket adapter. The problem is the socket.id
const express = require('express');
const app = express();
const socketio = require('socket.io');
const redis = require("redis");
const redisAdapter = require('@socket.io/redis-adapter');
const expressServer = app.listen(3001, () => console.log("Express is running!"));
const io = socketio(expressServer);
const sub = redis.createClient({port: 6379, host: '127.0.0.1'});
const pub = sub.duplicate();
io.adapter(redisAdapter(pub, sub));
sub.subscribe('conversation','message', function(){
// console.log('Subbed')
})
//sub.subscribe('conversation','message');
let room;
let soketId;
sub.on('error', function (error) {
console.log('ERROR ' + error);
});
sub.on('connect', function(){
console.log('Redis client connected');
});
sub.on('message', async function (channel, data) {
data = JSON.parse(data);
if(channel === 'conversation'){
room = data.id;
io.of('/').adapter.remoteJoin(socketId, room);
console.log('subbed ' + room)
}
});
io.on('connect', (socket) => {
socketId = socket.id;
io.of("/").adapter.on("join-room", (room, id) => {
console.log(`socket ${id} has joined room ${room}`);
});
});
The problem is socketId = socket.id;
is set to the last client that connected. So all clients become the last client that connected.
Upvotes: 0
Views: 1301
Reputation: 927
I have refactored your code to react to incoming messages from Redis pub/sub
This code can be reduced with the new Redis client or ioredis client which has a promise-based API.
Whenever a new client is connected, you need to subscribe for messages on Redis so that you can react to new messages while the client is connected.
The problem is when the socket is disconnected, you no longer need to receive those messages, so I created a helper function which does the following:
const express = require('express');
const app = express();
const socketio = require('socket.io');
const redis = require('redis');
const expressServer = app.listen(3001, () => console.log('Express is running!'));
const io = socketio(expressServer);
const createSubscriber = async cb => {
const client = redis.createClient({ port: 6379, host: '127.0.0.1' });
await new Promise((resolve, reject) => {
client.on('connect', resolve);
client.on('error', reason => {
client.end();
reject(reason);
});
});
await new Promise((resolve, reject) => {
client.subscribe('message', err => {
if (err) {
client.end();
reject(err);
} else {
resolve();
}
});
});
client.on('message', (channel, message) => {
if (channel === 'conversation') {
cb(JSON.parse(message));
}
});
return async () => {
await new Promise(resolve => {
client.unsubscribe('message', err => {
if (err) {
console.error(err);
}
client.end();
resolve();
});
});
};
};
io.on('connect', async socket => {
socket.emit('test', 'from server');
try {
const onDisconnected = await createSubscriber(data => {
socket.join(data.id);
});
socket.on('disconnect', async () => {
await onDisconnected();
});
} catch (error) {
console.error(error); // redis has failed to connect
socket.emit('error', error.message);
socket.disconnect(); // disconnect the socket since there is nothing to do
}
});
io.of('/').adapter.on('join-room', (room, id) => {
console.log(`socket ${id} has joined room ${room}`);
});
Upvotes: 0