Viswanath Kapavarapu
Viswanath Kapavarapu

Reputation: 71

How to implement async/await in event handler?

I have problem implementing await inside event handler. For some reason, event handler doesn't wait for first process to finish before starting new one. Why is this weird behaviour so?

const { EventEmitter } = require("events");

let alreadyRunning = false;

const sampleEventHandler = new EventEmitter();
sampleEventHandler.on("data", async (message) => {
  await heavyProcess(message);
});

async function heavyProcess(message) {
  console.log("New message: ", message);
  console.log("already running?: ", alreadyRunning);
  if (alreadyRunning) {
    console.log("Why doesn't it await for first task to complete fully before entering here?");
  }

  const rand = Math.random() * 1000;
  // set var here
  alreadyRunning = true;
  await sleep(rand);
  // unset here
  alreadyRunning = false;
}

function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

// Emit event 5 times
for (let i = 0; i < 5; i++) {
  sampleEventHandler.emit("data", i);
}

Upvotes: 6

Views: 4882

Answers (2)

Jeffrey Devloo
Jeffrey Devloo

Reputation: 1426

Edit: The reason why it is not discarded is because you are doing console.log only. Add a return under it.

async function heavyProcess(message) {
  console.log("New message: ", message);
  console.log("already running?: ", alreadyRunning);
  if (alreadyRunning) {
    console.log("Why doesn't it await for first task to complete fully before entering here?");
     return // Actually stop executing
  }

  const rand = Math.random() * 1000;
  // set var here
  alreadyRunning = true;
  await sleep(rand);
  // unset here
  alreadyRunning = false;
}

Edit 2: Following the comments: the required behaviour of OP is that the heavyProcess should only be happening once and only start a new one if the process has finished.

Every event is now starting a new heavyProcess as await just waits for a result, it is not blocking.

You'll have to introduce the blocking yourself in the event (or use RXJS exhaustMap or something similar).

let isProcessing = false;

sampleEventHandler.on("data", (message) => {
  if (isProcessing) { return; }
  isProcessing = true;
  const resetProcessing = () => isProcessing = false
  heavyProcess(message).then(resetProcessing, resetProcessing); // Reset of both complete and error

Upvotes: 1

MEDZ
MEDZ

Reputation: 2295

You need to promisify heavyProcess function in order to using await with it successfuly:

function heavyProcess(message) {
    return new Promise( async (resolve, reject) => {
        console.log("New message: ", message);
        console.log("already running?: ", alreadyRunning);
        if (alreadyRunning) {
            console.log("Why doesn't it await for first task to complete fully before entering here?");
        }

        const rand = Math.random() * 1000;
        // set var here
        alreadyRunning = true;
        await sleep(rand);
        // unset here
        alreadyRunning = false;
        resolve('Done');
    });
}

Upvotes: 1

Related Questions