Reputation: 69
I have a node.js app with a Kafka subscriber. The subscription handler use "fetch" to call a remote REST API (await fetch(...)).
I try to handle a high frequency of messages, the REST calls failed because of the overload of the remote server.
The overload happen because the subscriber hanler is asynchronous.
My question is : Is there is a way to insure that the async handlers are serialized, so there is no simultaneous calls to the remote API server ?
Chris: I am using kafka-node
Here is a code sample:
const consumer = new Consumer(this.client, [{ topic: topicKey}]);
consumer.on('message', function (message) {
handleMessage(message)
});
async function handleMessage(message) {
... decode the message
// Send to the Remote server using a REST call
//=> the task is suspended, waiting for the IO, so, meantime, the next message
// is processed, and I flood the remote server of POST requests.
await fetch(...);
}
Thanks.
Upvotes: 5
Views: 5232
Reputation: 64
If you just want to suspend a "flow" until your async code is being executed. Following approach can work pretty well, since readable stream (ConsumerGroupStream) can be paused and resumed again.
const kafka = require('kafka-node') const options = { kafkaHost: '127.0.0.1:9092', groupId: 'Group' }; const consumerGroupStream = new kafka.ConsumerGroupStream(options, ['queue']); consumerGroupStream .on('data', (massage) => { consumerGroupStream.pause(); console.log('Message: ', massage, Date.now()); asyncFunction().then(() => { console.log('Now data will start flowing again.', Date.now()); consumerGroupStream.resume(); }); });
Second option is to use transformation stream https://nodejs.org/api/stream.html#stream_class_stream_transform
const kafka = require('kafka-node') const options = { kafkaHost: '127.0.0.1:9092', groupId: 'Group' }; const consumerGroupStream = new kafka.ConsumerGroupStream(options, ['queue']); async function asyncFunction(message) { console.log('Message: ', message); } const Transform = require('stream').Transform const messageTransform = new Transform({ objectMode: true, decodeStrings: true, transform (message, encoding, done) { asyncFunction(message).then(() => { done() }) } }) consumerGroupStream .pipe(messageTransform)
Upvotes: 1
Reputation: 362
I'm not sure about what you want to achieve. I understand that your API is overloaded since you make too much simultaneous call to it.
So, if my understanding is good, you want to do it synchronously.
As I said in my comment, I think a queue is a good option. Here is how I would do it (you will probably find a better way to implement a queue somewhere else, but I just give you an idea of it :D)
const consumer = new Consumer(this.client, [{ topic: topicKey}]);
const myQueue = [];
consumer.on('message', function (message) {
myQueue.push(message);
});
async function consumeQueue(){
const message = myQueue.shift();
if(!message){
await sleep(3000);
} else {
// ... decode your message
await fetch(message)
}
consumeQueue();
}
function sleep(ms){
return new Promise(resolve => setTimeout(resolve, ms));
}
// you have to init it :D
consumeQueue();
Upvotes: 4