Reputation: 13
For example, there are two consumers in one group. Two consumers must get different messages from the topic.
I use like following, but it seems does not work. Two consumers get all messages from the topic.
std::string topic_str = "sample";
std::string errstr;
int32_t partition = 0;
int64_t start_offset = RdKafka::Topic::OFFSET_END;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
//...omit...
conf->set("broker.version.fallback", "0.8.2.2", errstr);
conf->set("group.id", "group_001", errstr);
//...omit...
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, tconf, errstr);
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
//...omit...
while (1) {
//...omit...
RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
//print message and offset
}
Upvotes: 1
Views: 2017
Reputation: 3113
librdkafka only suppoprts the new broker-based balanced consumer groups (KafkaConsumer class) added in Kafka 0.9. (The consumer group balancing on Kafka 0.8 was based on Zookeeper and only implemented in the official Scala client.)
You are also using the legacy low-level consumer (Consumer class) that does not have any form of balanced consumer support.
I suggest upgrading your Kafka cluster to 0.9 (or 0.10!) and changing your code to use the new KafkaConsumer class instead.
Example here: https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_consumer_example.cpp
Upvotes: 1