Nemoden
Nemoden

Reputation: 9056

kafka-node asynchronous consumer handler

That's how my consumer is initialised:

const client = new kafka.Client(config.ZK_HOST)
const consumer = new kafka.Consumer(client, [{ topic: config.KAFKA_TOPIC, offset: 0}],
{
    autoCommit: false
})

Now the consumer consumer.on('message', message => applyMessage(message))

The thing is applyMessage talks to the database using knex, the code looks something like:

async function applyMessage(message: kafka.Message) {
    const usersCount = await db('users').count()
    // just assume we ABSOLUTELY need to calculate a number of users,
    // so we need previous state
    await db('users').insert(inferUserFromMessage(message))
}

The code above makes applyMessage to execute in parallel for all the messages in kafka, so in the code above given that there are no users in the database yet, usersCount will ALWAYS be 0 even for the second message from kafka where it should be 1 already since first call to applyMessage inserts a user.

How do I "synchronise" the code in a way that all the applyMessage functions run sequentially?

Upvotes: 1

Views: 1457

Answers (1)

pmar
pmar

Reputation: 314

You'll need to implement some sort of Mutex. Basically a class which queues up things to execute synchronously. Example

var Mutex = function() {
  this.queue = [];
  this.locked = false;
};

Mutex.prototype.enqueue = function(task) {
  this.queue.push(task);
  if (!this.locked) {
    this.dequeue();
  }
};

Mutex.prototype.dequeue = function() {
  this.locked = true;
  const task = this.queue.shift();
  if (task) {
    this.execute(task);
  } else {
    this.locked = false;
  }
};

Mutex.prototype.execute = async function(task) {
  try { await task(); } catch (err) { }
  this.dequeue();
}

In order for this to work, your applyMessage function (whichever handles Kafka messages) needs to return a Promise - notice also the async has moved from the parent function to the returned Promise function:

function applyMessage(message: kafka.Message) {
  return new Promise(async function(resolve,reject) {
    try {
      const usersCount = await db('users').count()
      // just assume we ABSOLUTELY need to calculate a number of users,
      // so we need previous state
      await db('users').insert(inferUserFromMessage(message))
      resolve();
    } catch (err) {
      reject(err);
    }
  });
}

Finally, each invocation of applyMessage needs to be added to the Mutex queue instead of called directly:

var mutex = new Mutex();
consumer.on('message', message => mutex.enqueue(function() { return applyMessage(message); }))

Upvotes: 3

Related Questions