Loay Ashmawy
Loay Ashmawy

Reputation: 687

kafka consumer receiving overhead?

I have a kafka consumer that does a poll every 10 seconds. I am using wireshark to monitor my network's acitvity.

I noticed that even when I am not doing any fetch requests, there is still traffic between the broker and my consumer. And I also noticed that it is the same packets (almost the same, only a slight change to the payload) that are sent and received periodically.

Is this some kind of keep alive packets? how to reduce them?

Here is a screenshot of those packets: enter image description here

PS: I am using cppkafka as lib and kafka broker 0.8.2.2

EDIT: The client's code

bool running = true;

int main(int argc, char* argv[]) {
    string brokers;
    string topic_name;
    string group_id;

    po::options_description options("Options");
    options.add_options()
        ("help,h",     "produce this help message")
        ("brokers,b",  po::value<string>(&brokers)->required(), 
                       "the kafka broker list")
        ("topic,t",    po::value<string>(&topic_name)->required(),
                       "the topic in which to write to")
        ("group-id,g", po::value<string>(&group_id)->required(),
                       "the consumer group id")
        ;

    po::variables_map vm;

    try {
        po::store(po::command_line_parser(argc, argv).options(options).run(), vm);
        po::notify(vm);
    }
    catch (exception& ex) {
        cout << "Error parsing options: " << ex.what() << endl;
        cout << endl;
        cout << options << endl;
        return 1;
    }

    // Stop processing on SIGINT
    signal(SIGINT, [](int) { running = false; });

    // Construct the configuration
    Configuration config = {
        { "metadata.broker.list", brokers },
        { "api.version.request", false },
        { "broker.version.fallback", "0.8.2.2" },   
        { "group.id", group_id },
        // Disable auto commit
        { "enable.auto.commit", false }
    };

    // Create the consumer
    Consumer consumer(config);

    // Subscribe to the topic
    TopicPartitionList topicList;
    cppkafka::TopicPartition topPar(topic_name,0);
    topPar.set_offset(0);
    topicList.push_back(topPar);
    cout << "Consuming messages from topic " << topic_name << endl;

    consumer.assign(topicList);

    // Now read lines and write them into kafka
    while (running) {
        // Try to consume a message
        Message msg = consumer.poll();
        if (msg) {
            // If we managed to get a message
            if (msg.get_error()) {
                // Ignore EOF notifications from rdkafka
                if (!msg.is_eof()) {
                    cout << "[+] Received error notification: " << msg.get_error() << endl;
                } else {
                    std::this_thread::sleep_for(std::chrono::milliseconds(10000));
                }
            } else {
                // Print the key (if any)
                if (msg.get_key()) {
                    cout << msg.get_key() << " -> ";
                }
                // Print the payload
                cout << msg.get_payload() << endl;
            }
        }
    }
}

Upvotes: 2

Views: 1103

Answers (2)

Mickael Maison
Mickael Maison

Reputation: 26885

Cppkafka is built on top of librdkafka. Librdkafka attempts to prefetch messages for all assigned partitions, so messages are immediately available when you call poll().

By default, librdkafka pretty aggressive (aiming for best performance), hence the few FetchRequests per second you're seeing.

For more details, see librdkafka's FAQ:

Upvotes: 3

sap1ens
sap1ens

Reputation: 2957

You probably see heartbeat messages to keep the consumer group alive, you can find more info about them here: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-GroupMembershipAPI

It's possible to adjust the heartbeat interval by modifying heartbeat.interval.ms, check the librdkafka configuration.

Upvotes: 0

Related Questions