version 2
version 2

Reputation: 1059

nodejs- socket.io server pushes same data for all users

The socket.io implementation I did is wrong. I am trying to implement long polling on a mysql data.

For this demo, I'm fetching user data for each user connected to the long-polling server I created. Each user will pass his unique user_id to the server to fetch the data related to him/her. (In real app, I wanna fetch user notifications)

Users get connected to the server and server does long polling from the DB also. But the problem is, same data is pushed back to the client even though I'm passing unique ids from client side.

User 1 passed id 5 and user 2 passes id 3. Both these users are getting same data as response (user 1's data)

Here's the client side code:

var user_id = 5; //hardcoded for now, but in real its fetched from a form
var params = {
    userId: user_id,
};
// create a new websocket
var socket = io.connect('http://localhost:8000', { query: params });

// on message received we print all the data inside the #container div
socket.on('notification', function (data) {
    console.log(data);

});

Here's the server side code:

var app = require('http').createServer(handler),
    io = require('socket.io').listen(app),
    fs = require('fs'),
    mysql = require('mysql'),
    connectionsArray = [],
    connection = mysql.createConnection({
        host: 'localhost',
        user: 'root',
        password: '',
        database: 'dumydb',
        port: 3306
    }),
    POLLING_INTERVAL = 5000,
    pollingTimer;

// If there is an error connecting to the database
connection.connect(function (err) {
    // connected! (unless `err` is set)
    if (err) {
        console.log(err);
    }
});

// creating the server ( localhost:8000 )
app.listen(8000);

// on server started we can load our client.html page
function handler(req, res) {
    fs.readFile(__dirname + '/client.html', function (err, data) {
        if (err) {
            console.log(err);
            res.writeHead(500);
            return res.end('Error loading client.html');
        }
        res.writeHead(200);
        res.end(data);
    });
}

// creating a new websocket to keep the content updated without any AJAX request
io.sockets.on('connection', function (socket) {

    var userId = socket.handshake.query.userId;
    // starting the loop only if at least there is one user connected
    if (!connectionsArray.length) {
        pollingLoop(userId);
    }

    socket.on('disconnect', function () {
        var socketIndex = connectionsArray.indexOf(socket);
        console.log('socketID = %s got disconnected', socketIndex);
        if (~socketIndex) {
            connectionsArray.splice(socketIndex, 1);
        }
    });

    console.log('A new socket is connected!');
    connectionsArray.push(socket);

});

var pollingLoop = function (userId) {
    var params = [userId];
    // Doing the database query
    var tempQuery = `SELECT full_name FROM users WHERE id=?`;
    var query = connection.query(tempQuery, params),
        users = []; // this array will contain the result of our db query

    // setting the query listeners
    query
        .on('error', function (err) {
            // Handle error, and 'end' event will be emitted after this as well
            updateSockets(err);
        })
        .on('result', function (user) {
            // it fills our array looping on each user row inside the db
            users.push(user);
        })
        .on('end', function () {
            // loop on itself only if there are sockets still connected
            if (connectionsArray.length) {

                pollingTimer = setTimeout(function () { pollingLoop(userId) }, POLLING_INTERVAL);

                updateSockets({
                    users: users
                });
            } else {
                console.log('The server timer was stopped because there are no more socket connections on the app')
            }
        });
};

var updateSockets = function (data) {
    // adding the time of the last update
    data.time = new Date();
    // sending new data to all the sockets connected
    connectionsArray.forEach(function (tmpSocket) {
        tmpSocket.volatile.emit('notification', data);
    });
};

console.log('Please use your browser to navigate to http://localhost:8000');

This is the tutorial I'm following: Link

Can anyone help me figure out why same data is being pushed to all users? Thanks!

UPDATE 1

I tried the solutions provided by Theo and Abdul Rab Memon Changed my updateSockets() but now the data is getting pushed only to the first client connected. I did a console.log() Take a look at this:

enter image description here

UPDATE 2: package.json

{
  "name": "nodejs-MySQL-push-notifications-demo",
  "version": "1.0.0",
  "description": "nodejs-MySQL-push-notifications-demo",
  "main": "server.js",
  "dependencies": {
    "express": "^4.16.2",
    "mysql": "~2.5.4",
    "socket.io": "~1.3.2"
  },
  "devDependencies": {},
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1",
    "start": "node server.js"
  },
  "repository": {
    "type": "git",
    "url": "git://github.com/GianlucaGuarini/nodejs-MySQL-push-notifications-demo.git"
  },
  "author": "Gianluca Guarini",
  "license": "BSD-2-Clause",
  "bugs": {
    "url": "https://github.com/GianlucaGuarini/nodejs-MySQL-push-notifications-demo/issues"
  },
  "homepage": "https://github.com/GianlucaGuarini/nodejs-MySQL-push-notifications-demo"
}

Upvotes: 8

Views: 2000

Answers (2)

Abdul Rab Memon
Abdul Rab Memon

Reputation: 426

var app = require('http').createServer(handler),
    io = require('socket.io').listen(app),
    fs = require('fs'),
    mysql = require('mysql'),
    connectionsArray = [],
    connection = mysql.createConnection({
        host: 'localhost',
        user: 'root',
        password: '',
        database: 'dumydb',
        port: 3306
    }),
    POLLING_INTERVAL = 5000,
    pollingTimer;

// If there is an error connecting to the database
connection.connect(function (err) {
    // connected! (unless `err` is set)
    if (err) {
        console.log(err);
    }
});

// creating the server ( localhost:8000 )
app.listen(8000);

// on server started we can load our client.html page
function handler(req, res) {
    fs.readFile(__dirname + '/client.html', function (err, data) {
        if (err) {
            console.log(err);
            res.writeHead(500);
            return res.end('Error loading client.html');
        }
        res.writeHead(200);
        res.end(data);
    });
}

// creating a new websocket to keep the content updated without any AJAX request
io.sockets.on('connection', function (socket) {

    var userId = socket.handshake.query.userId;
    connectionsArray.push(socket);
    // starting the loop only if at least there is one user connected
    if (connectionsArray.length) {
        var socketIndex = connectionsArray.indexOf(socket);
        pollingLoop(userId, socketIndex);
    }

    socket.on('disconnect', function () {
        var socketIndex = connectionsArray.indexOf(socket);
        console.log('socketID = %s got disconnected', socketIndex);
        if (~socketIndex) {
            connectionsArray.splice(socketIndex, 1);
        }
    });

    console.log('A new socket is connected!');

});

var pollingLoop = function (userId, socketIndex) {
    var params = [userId];
    // Doing the database query
    var tempQuery = `SELECT full_name FROM users WHERE id=?`;
    var query = connection.query(tempQuery, params),
        users = []; // this array will contain the result of our db query

    // setting the query listeners
    query
        .on('error', function (err) {
            // Handle error, and 'end' event will be emitted after this as well
            updateSockets(err, socketIndex);
        })
        .on('result', function (user) {
            // it fills our array looping on each user row inside the db
            users.push(user);
        })
        .on('end', function () {
            // loop on itself only if there are sockets still connected
            if (connectionsArray.length) {

                pollingTimer = setTimeout(function () { pollingLoop(userId) }, POLLING_INTERVAL);

                updateSockets({
                    users: users
                }, socketIndex);
            } else {
                console.log('The server timer was stopped because there are no more socket connections on the app')
            }
        });
};

var updateSockets = function (data, socketIndex) {
    // adding the time of the last update
    data.time = new Date();
    // sending new data to all the sockets connected
    connectionsArray[socketIndex].volatile.emit('notification', data);
};

console.log('Please use your browser to navigate to http://localhost:8000');

In the updateSockets method you're iterating over connectionsArray and emitting 'notification' event with data on each socket that's present in connectionsArray, you need to pass the socket from socket.on('connection', callback(socket)) to pollingLoop method and from pollingLoop to updatSocket method.

Upvotes: 6

Theo
Theo

Reputation: 2042

Since you mentioned that you want to fetch data related to the user, you need to emit to the specific client socket. To do this, you need to save the socket.id of the client every time they connect. You can do it with something like this:

var userList = {};

io.on('connection', (socket) => {
   usersList[socket.handshake.query.userId].socket_id = socket.id;
});

After a successful polling, you can send to specific user like this
Note the additional parameter userId that you can pass from pollingLoop

var updateSockets = function (data, userId) {
    // adding the time of the last update
    data.time = new Date();
    // sending new data to the specific socket connected
    io.sockets.to(userList[userId].socket_id).emit('notification', data);
};

You may try to use namespaces or rooms to send to a group. so instead of

connectionsArray.forEach(function (tmpSocket) {
    tmpSocket.volatile.emit('notification', data);
});

you can use namespaces

Client code

var socket = io.connect('http://localhost:8000/your_name_space', { query: params });

Server code

io.of('/your_name_space').emit('notification', data_to_be_sent);

or groups - Please refer to this link for more info on rooms https://socket.io/docs/rooms-and-namespaces/

Upvotes: 3

Related Questions