Reputation: 687
I have a kafka consumer that does a poll every 10 seconds. I am using wireshark to monitor my network's acitvity.
I noticed that even when I am not doing any fetch requests, there is still traffic between the broker and my consumer. And I also noticed that it is the same packets (almost the same, only a slight change to the payload) that are sent and received periodically.
Is this some kind of keep alive packets? how to reduce them?
Here is a screenshot of those packets:
PS: I am using cppkafka as lib and kafka broker 0.8.2.2
EDIT: The client's code
bool running = true;
int main(int argc, char* argv[]) {
string brokers;
string topic_name;
string group_id;
po::options_description options("Options");
options.add_options()
("help,h", "produce this help message")
("brokers,b", po::value<string>(&brokers)->required(),
"the kafka broker list")
("topic,t", po::value<string>(&topic_name)->required(),
"the topic in which to write to")
("group-id,g", po::value<string>(&group_id)->required(),
"the consumer group id")
;
po::variables_map vm;
try {
po::store(po::command_line_parser(argc, argv).options(options).run(), vm);
po::notify(vm);
}
catch (exception& ex) {
cout << "Error parsing options: " << ex.what() << endl;
cout << endl;
cout << options << endl;
return 1;
}
// Stop processing on SIGINT
signal(SIGINT, [](int) { running = false; });
// Construct the configuration
Configuration config = {
{ "metadata.broker.list", brokers },
{ "api.version.request", false },
{ "broker.version.fallback", "0.8.2.2" },
{ "group.id", group_id },
// Disable auto commit
{ "enable.auto.commit", false }
};
// Create the consumer
Consumer consumer(config);
// Subscribe to the topic
TopicPartitionList topicList;
cppkafka::TopicPartition topPar(topic_name,0);
topPar.set_offset(0);
topicList.push_back(topPar);
cout << "Consuming messages from topic " << topic_name << endl;
consumer.assign(topicList);
// Now read lines and write them into kafka
while (running) {
// Try to consume a message
Message msg = consumer.poll();
if (msg) {
// If we managed to get a message
if (msg.get_error()) {
// Ignore EOF notifications from rdkafka
if (!msg.is_eof()) {
cout << "[+] Received error notification: " << msg.get_error() << endl;
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
}
} else {
// Print the key (if any)
if (msg.get_key()) {
cout << msg.get_key() << " -> ";
}
// Print the payload
cout << msg.get_payload() << endl;
}
}
}
}
Upvotes: 2
Views: 1103
Reputation: 26885
Cppkafka is built on top of librdkafka. Librdkafka attempts to prefetch messages for all assigned partitions, so messages are immediately available when you call poll().
By default, librdkafka pretty aggressive (aiming for best performance), hence the few FetchRequests per second you're seeing.
For more details, see librdkafka's FAQ:
Upvotes: 3
Reputation: 2957
You probably see heartbeat messages to keep the consumer group alive, you can find more info about them here: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-GroupMembershipAPI
It's possible to adjust the heartbeat interval by modifying heartbeat.interval.ms, check the librdkafka configuration.
Upvotes: 0