Reputation: 158
I have a created a HighLevelProducer to publish messages to a topic stream that will be consumed by a ConsumerGroupStream using kafka-node. When I create multiple consumers from the same ConsumerGroup to consume from that same topic only one partition is created and only one consumer is consuming. I have also tried to define the number of partitions for that topic although I'm not sure if is required to define it upon creating the topic and if so how many partitions will I need in advance. In addition, is it possible to push an object to the Transform stream and not a string (I currently used JSON.stringify because otherwise I got [Object object] in the consumer.
const myProducerStream = ({ kafkaHost, highWaterMark, topic }) => {
const kafkaClient = new KafkaClient({ kafkaHost });
const producer = new HighLevelProducer(kafkaClient);
const options = {
highWaterMark,
kafkaClient,
producer
};
kafkaClient.refreshMetadata([topic], err => {
if (err) throw err;
});
return new ProducerStream(options);
};
const transfrom = topic => new Transform({
objectMode: true,
decodeStrings: true,
transform(obj, encoding, cb) {
console.log(`pushing message ${JSON.stringify(obj)} to topic "${topic}"`);
cb(null, {
topic,
messages: JSON.stringify(obj)
});
}
});
const publisher = (topic, kafkaHost, highWaterMark) => {
const myTransfrom = transfrom(topic);
const producer = myProducerStream({ kafkaHost, highWaterMark, topic });
myTransfrom.pipe(producer);
return myTransform;
};
The consumer:
const createConsumerStream = (sourceTopic, kafkaHost, groupId) => {
const consumerOptions = {
kafkaHost,
groupId,
protocol: ['roundrobin'],
encoding: 'utf8',
id: uuidv4(),
fromOffset: 'latest',
outOfRangeOffset: 'earliest',
};
const consumerGroupStream = new ConsumerGroupStream(consumerOptions, sourceTopic);
consumerGroupStream.on('connect', () => {
console.log(`Consumer id: "${consumerOptions.id}" is connected!`);
});
consumerGroupStream.on('error', (err) => {
console.error(`Consumer id: "${consumerOptions.id}" encountered an error: ${err}`);
});
return consumerGroupStream;
};
const publisher = (func, destTopic, consumerGroupStream, kafkaHost, highWaterMark) => {
const messageTransform = new AsyncMessageTransform(func, destTopic);
const resultProducerStream = myProducerStream({ kafkaHost, highWaterMark, topic: destTopic })
consumerGroupStream.pipe(messageTransform).pipe(resultProducerStream);
};
Upvotes: 3
Views: 1995
Reputation: 1096
For the first question: The maximum working consumers in a group are equal to the number of partitions.
So if you have TopicA with 1 partition and you have 5 consumers in your consumer group, 4 of them will be idle.
If you have TopicA with 5 partitions and you have 5 consumers in your consumer group, all of them will be active and consuming messages from your topic.
To specify the number of partitions, you should create the topic from CLI instead of expecting Kafka to create it when you first publish messages.
To create a topic with a specific number of partitions:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
To alter the number of partitions in an already existed topic:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic test
--partitions 40
Please note that you can only increase the number of partitions, you can not decrease them.
Please refer to Kafka Docs https://kafka.apache.org/documentation.html
Also if you'd like to understand more about Kafka please check the free book https://www.confluent.io/resources/kafka-the-definitive-guide/
Upvotes: 2