iDev
iDev

Reputation: 2421

Python Kafka consumer with offset management

I am a newbie to Kafka and I am trying to set up a consumer in Kafka such that it reads messages published by Kafka Producer.

Correct me if I am wrong, the way I understood if Kafka consumer stores offset in ZooKeeper? However, I don't have a zookeeper instance running and want to poll let's say every 5 mins to see if there are any new messages published.

So far, the code that I have is:

import logging
from django.conf import settings
import kafka
import sys
import json

bootstrap_servers = ['localhost:8080']
topicName = 'test-info'
consumer = kafka.KafkaConsumer (topicName, group_id = 'test',bootstrap_servers = 
bootstrap_servers,
auto_offset_reset = 'earliest')

count = 0
#print(consumer.topic)
try:
    for message in consumer:
        #print(type(message.value))
        print("\n")
        print("<>"*20)
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key, message.value))
        print("--"*20)
        info = json.loads(message.value)

        if info['event'] == "new_record" and info['data']['userId'] == "user1" and info['data']['details']['userTeam'] == "foo":
           count = count + 1
           print(count, info['data']['details']['team'], info['data']['details']['leadername'],info['data']['details']['category'])
        else:
            print("Skipping")

    print(count)


except KeyboardInterrupt:
    sys.exit()

How can I save the offset such that next time it polls it reads incremental data?

Upvotes: 2

Views: 7090

Answers (2)

Muhammad Faizan Fareed
Muhammad Faizan Fareed

Reputation: 3748

Now Kafka stores offsets in a consumer topic (partition).

Commit offset

We have 2 options :

Auto commit offset message
 enable_auto_commit=True 
Manual commit offset message
from kafka import TopicPartition, OffsetAndMetadata

# set to False
enable_auto_commit=False

# After consuming the message commit the offset.
consumer.commit({TopicPartition(topic_name, message.partition): OffsetAndMetadata(message.offset + 1, '')})

Follow @DennisLi approach or re-run the consumer after five minutes.

Upvotes: 0

DennisLi
DennisLi

Reputation: 4156

  1. It's true that Kafka consumer stores offset in ZooKeeper. Since you don't have zookeeper installed. Kafka probably uses the its built-in zookeeper.

  2. in your case, you don't have do anything more, as you already set the group_id, group_id = 'test'. therefore, the consumer will continue consume the data from the last offset automatically for a specific group. because it committed the latest offset in zookeeper automatically (auto_commit is True by default). for more info you can check here

  3. if you want to check every 5 mins to see if there are any new messages published, you can add time.sleep(300) in your consumer for loop.

Upvotes: 3

Related Questions