Reputation: 2421
I am a newbie to Kafka and I am trying to set up a consumer in Kafka such that it reads messages published by Kafka Producer.
Correct me if I am wrong, the way I understood if Kafka consumer stores offset in ZooKeeper? However, I don't have a zookeeper instance running and want to poll let's say every 5 mins to see if there are any new messages published.
So far, the code that I have is:
import logging
from django.conf import settings
import kafka
import sys
import json
bootstrap_servers = ['localhost:8080']
topicName = 'test-info'
consumer = kafka.KafkaConsumer (topicName, group_id = 'test',bootstrap_servers =
bootstrap_servers,
auto_offset_reset = 'earliest')
count = 0
#print(consumer.topic)
try:
for message in consumer:
#print(type(message.value))
print("\n")
print("<>"*20)
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key, message.value))
print("--"*20)
info = json.loads(message.value)
if info['event'] == "new_record" and info['data']['userId'] == "user1" and info['data']['details']['userTeam'] == "foo":
count = count + 1
print(count, info['data']['details']['team'], info['data']['details']['leadername'],info['data']['details']['category'])
else:
print("Skipping")
print(count)
except KeyboardInterrupt:
sys.exit()
How can I save the offset such that next time it polls it reads incremental data?
Upvotes: 2
Views: 7090
Reputation: 3748
Now Kafka stores offsets in a consumer topic (partition).
We have 2 options :
enable_auto_commit=True
from kafka import TopicPartition, OffsetAndMetadata
# set to False
enable_auto_commit=False
# After consuming the message commit the offset.
consumer.commit({TopicPartition(topic_name, message.partition): OffsetAndMetadata(message.offset + 1, '')})
Follow @DennisLi approach or re-run the consumer after five minutes.
Upvotes: 0
Reputation: 4156
It's true that Kafka consumer stores offset in ZooKeeper. Since you don't have zookeeper installed. Kafka probably uses the its built-in zookeeper.
in your case, you don't have do anything more, as you already set the group_id, group_id = 'test'
. therefore, the consumer will continue consume the data from the last offset automatically for a specific group. because it committed the latest offset in zookeeper automatically (auto_commit is True by default).
for more info you can check here
if you want to check every 5 mins to see if there are any new messages published, you can add time.sleep(300)
in your consumer for loop.
Upvotes: 3