Thibault Cuvillie
Thibault Cuvillie

Reputation: 69

kafka consumer and async handler

I have a node.js app with a Kafka subscriber. The subscription handler use "fetch" to call a remote REST API (await fetch(...)).

I try to handle a high frequency of messages, the REST calls failed because of the overload of the remote server.

The overload happen because the subscriber hanler is asynchronous.

My question is : Is there is a way to insure that the async handlers are serialized, so there is no simultaneous calls to the remote API server ?

Chris: I am using kafka-node

Here is a code sample:

const consumer = new Consumer(this.client, [{ topic: topicKey}]);
consumer.on('message', function (message) {
  handleMessage(message)
});

async function handleMessage(message) {
   ... decode the message

  // Send to the Remote server using a REST call

  //=> the task is suspended, waiting for the IO, so, meantime, the next message
  //   is processed, and I flood the remote server of POST requests.
  await fetch(...);
}

Thanks.

Upvotes: 5

Views: 5232

Answers (2)

vasyaod
vasyaod

Reputation: 64

If you just want to suspend a "flow" until your async code is being executed. Following approach can work pretty well, since readable stream (ConsumerGroupStream) can be paused and resumed again.

    const kafka = require('kafka-node')

    const options = {
      kafkaHost: '127.0.0.1:9092',
      groupId: 'Group'
    };

    const consumerGroupStream = new kafka.ConsumerGroupStream(options, ['queue']);

    consumerGroupStream
      .on('data', (massage) => {
        consumerGroupStream.pause();
        console.log('Message: ', massage, Date.now());
        asyncFunction().then(() => {
          console.log('Now data will start flowing again.', Date.now());
          consumerGroupStream.resume();
        });
      });

Second option is to use transformation stream https://nodejs.org/api/stream.html#stream_class_stream_transform

    const kafka = require('kafka-node')

    const options = {
      kafkaHost: '127.0.0.1:9092',
      groupId: 'Group'
    };

    const consumerGroupStream = new kafka.ConsumerGroupStream(options, ['queue']);

    async function asyncFunction(message) {
      console.log('Message: ', message);
    }

    const Transform = require('stream').Transform

    const messageTransform = new Transform({
      objectMode: true,
      decodeStrings: true,
      transform (message, encoding, done) {
        asyncFunction(message).then(() => {
          done()
        })
      }
    })

    consumerGroupStream
      .pipe(messageTransform)

Upvotes: 1

Vashnak
Vashnak

Reputation: 362

I'm not sure about what you want to achieve. I understand that your API is overloaded since you make too much simultaneous call to it.

So, if my understanding is good, you want to do it synchronously.

As I said in my comment, I think a queue is a good option. Here is how I would do it (you will probably find a better way to implement a queue somewhere else, but I just give you an idea of it :D)

const consumer = new Consumer(this.client, [{ topic: topicKey}]);
const myQueue = [];

consumer.on('message', function (message) {
    myQueue.push(message);
});

async function consumeQueue(){
    const message = myQueue.shift();

    if(!message){
        await sleep(3000);
    } else {
        // ... decode your message
        await fetch(message)
    }

    consumeQueue();
}

function sleep(ms){
    return new Promise(resolve => setTimeout(resolve, ms));
}

// you have to init it :D
consumeQueue();

Upvotes: 4

Related Questions