Reputation: 1843
I have a nodejs cluster with a primary that handles worker cycles (in the while loop) and that listens to worker messages to progress in the cycle. (In my code index.js does not send messages on setInterval but on other type of event, I have here simplified the code to get the essence of the problem)
Server.js
var cluster = require('cluster');
const ClusterMessages = require('cluster-messages');
const messages = new ClusterMessages();
if (cluster.isMaster){
let worker = cluster.fork()
console.log(cluster);
(async ()=>{
let cycle = 0
while(true){
console.log(cycle);
cycle ++
await Promise.all([
enough(),
])
}
function enough () {
return new Promise(resolve => {
messages.on('enough', () => {
console.log('enough');
resolve()
});
});
}})()
} else {
require('./index.js')
}
Index.js
const ClusterMessages = require('cluster-messages');
const messages = new ClusterMessages();
setInterval(() => {
messages.send('enough');
}, 1000);
The code is working fine (as such, in this example and in my code) but there seems to be a memory leak as you can understand from the output of this code:
0
enough
1
enough
enough
2
enough
enough
enough
3
enough
enough
enough
enough...
I tried several things like exchanging new Promise and messages.on(), add a return in the callback of the promise but I have no clue what is happening here. Any ideas?
The solution is to make another event that can be triggered once contrary to the 'event listener' of the cluster-messages package
Server.js
if (cluster.isMaster){
let worker = cluster.fork()
console.log(cluster);
// Importing events
const EventEmitter = require('events');
const eventEmitter = new EventEmitter();
messages.on('enough', () => {
eventEmitter.emit('event');
});
(async ()=>{let cycle = 0
while(true){
console.log(cycle);
cycle ++
await Promise.all([
enough(),
])
}
function enough () {
return new Promise(resolve => {
eventEmitter.once('event', () => {
console.log('event');
resolve()
});
});
}})()
} else {
require('./index.js')
}
Index.js
const ClusterMessages = require('cluster-messages');
const messages = new ClusterMessages();
setInterval(() => {
messages.send('enough');
}, 1000);
Upvotes: 1
Views: 1753
Reputation: 664444
Every call of enough()
installs another listener for the enough
event on messages
. They never get removed, leaking memory (and leading to an increasing number of logs per event). Instead, use the once
method to install the listener:
function enough () {
return new Promise(resolve => {
messages.once('enough', () => {
// ^^^^
console.log('enough');
resolve();
});
});
}
Or even simpler, using once
:
const { once } = require('events');
function enough() {
return once(messages, 'enough');
}
In your particular example, I would recommend not to use promises to handle the events. You might even miss events that are fired while you are removing and re-attaching a listener. Just write
let cycle = 0
messages.on('enough', () => {
console.log(cycle);
cycle++;
});
If for some reason you need a loop that you can break from or await
other things in, I would recommend an asynchronous iterator, built with on
:
const { once } = require('events');
(async () => {
let cycle = 0
for await (const _ of on(messages, 'enough')) {
console.log(cycle);
cycle++;
}
})();
Upvotes: 2