paka
paka

Reputation: 1641

kafka-node several consumers

Have a basic example that works like a charm for 1 consumer. It receives the messages. But adding one additional consumer will be ignored.

let kafka = require('kafka-node');
let client = new kafka.Client();

let producer = new kafka.Producer(client);

let consumer1 =  new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}]);
let consumer2 =  new kafka.Consumer(client,[ {topic: 'topic2', partition: 0}]);


producer.on('ready', function () {

    producer.send([{topic:'topic1', messages: 'topic 1 msg' ], (err,data)=>{
        console.log(err,'1 sent');
    });
    producer.send([{topic:'topic2', messages: 'topic 1 msg'}], (err,data)=>{
        console.log(err, '2 sent');
    });

});
producer.on('error', function (err) {
    console.log('err', err);
})



consumer1.on('message',(message) =>{
    console.log(11, message);
});
consumer2.on('message',(message) =>{
    console.log(22, message);
})

The issue that event with '22' for consumer2 never fires. Data on that topic exist if I check it with command line tools

Upvotes: 4

Views: 5874

Answers (2)

Kumar
Kumar

Reputation: 41

If you have only one partition in kafka clusture for a topic then you can not use more than one consumer. If you want to use n consumers you will have to have at least n partition. It is explained here in more detail.

Upvotes: 0

streetturtle
streetturtle

Reputation: 5850

You forgot to add a consumer group in consumer parameter. In your case both consumers belong to one consumer group (which is by default called kafka-node-group). Having one consumer group means that every message will be delivered once per consumer group. And since your topic has 0 partitions only the first consumer will process the message.

If you want to send the same message to both consumers you have to have two consumer groups (one consumer per group). You can do it following way:

let consumer1 =  new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}], {groupId: 'group1'});
let consumer2 =  new kafka.Consumer(client,[ {topic: 'topic2', partition: 0}], {groupId: 'group2'});

If you want your consumers process messages together (by partitions) you need to increase number of partitions in your topic.

Upvotes: 7

Related Questions