Reputation: 71
I have problem implementing await inside event handler. For some reason, event handler doesn't wait for first process to finish before starting new one. Why is this weird behaviour so?
const { EventEmitter } = require("events");
let alreadyRunning = false;
const sampleEventHandler = new EventEmitter();
sampleEventHandler.on("data", async (message) => {
await heavyProcess(message);
});
async function heavyProcess(message) {
console.log("New message: ", message);
console.log("already running?: ", alreadyRunning);
if (alreadyRunning) {
console.log("Why doesn't it await for first task to complete fully before entering here?");
}
const rand = Math.random() * 1000;
// set var here
alreadyRunning = true;
await sleep(rand);
// unset here
alreadyRunning = false;
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Emit event 5 times
for (let i = 0; i < 5; i++) {
sampleEventHandler.emit("data", i);
}
Upvotes: 6
Views: 4882
Reputation: 1426
Edit:
The reason why it is not discarded is because you are doing console.log
only. Add a return under it.
async function heavyProcess(message) {
console.log("New message: ", message);
console.log("already running?: ", alreadyRunning);
if (alreadyRunning) {
console.log("Why doesn't it await for first task to complete fully before entering here?");
return // Actually stop executing
}
const rand = Math.random() * 1000;
// set var here
alreadyRunning = true;
await sleep(rand);
// unset here
alreadyRunning = false;
}
Edit 2: Following the comments: the required behaviour of OP is that the heavyProcess should only be happening once and only start a new one if the process has finished.
Every event is now starting a new heavyProcess as await just waits for a result, it is not blocking.
You'll have to introduce the blocking yourself in the event (or use RXJS exhaustMap or something similar).
let isProcessing = false;
sampleEventHandler.on("data", (message) => {
if (isProcessing) { return; }
isProcessing = true;
const resetProcessing = () => isProcessing = false
heavyProcess(message).then(resetProcessing, resetProcessing); // Reset of both complete and error
Upvotes: 1
Reputation: 2295
You need to promisify heavyProcess
function in order to using await with it successfuly:
function heavyProcess(message) {
return new Promise( async (resolve, reject) => {
console.log("New message: ", message);
console.log("already running?: ", alreadyRunning);
if (alreadyRunning) {
console.log("Why doesn't it await for first task to complete fully before entering here?");
}
const rand = Math.random() * 1000;
// set var here
alreadyRunning = true;
await sleep(rand);
// unset here
alreadyRunning = false;
resolve('Done');
});
}
Upvotes: 1