bsd
bsd

Reputation: 1247

read data from last consummed offset in kafka using python

I am using confluent python kafka library to consume messages from kafka. Is there a way to start consuming messages from the last consumed one ? Say that I have 10 messages in a topic: * I consume those 10 messages After a while I have a couple more message written to that topic. I would like to consume those messages, but first hand skip over those first 10 messages and so on. Is it possible ?

I've tried setting 'auto.offset.reset' to 'largest', but that will make me consume from that point on. It will not consume 'unread' messages.

Meaning, if I write 10 messages in a topic and then try to consume them with that setting activated, it will listen only for incoming (new) messages.

This is the function for polling messages:

In [50]: def get_messages(topics=None):
...:     try:                                                                                                                                                                                                                                                       
...:         c = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'xyz_group', 'default.topic.config': {'enable.auto.commit': 'true'}})
...:         if topics is not None:
...:             c.subscribe(topics)
...:     except KafkaException as e:
...:         print('We got an exception: {}'.format(e))
...:     else:
...:         running = True
...:         while running:
...:             msg = c.poll()
...:             if not msg.error():
...:                 msg_payload = msg.value().decode('utf-8')
...:                 print('Received: {}'.format(msg_payload))
...:                 msg_data = json.loads(msg_payload)
...:                 for k, v in msg_data.iteritems():
...:                     if k == 'signal' and v == 'stop':
...:                         running = False 
...:                         #raise SystemExit('we got stop signal !')
...:                     else:                         
...:                         print('continue listening ...')
...:             elif msg.error().code() != KafkaError._PARTITION_EOF:
...:                 print(msg.error())     
...:                 running = False                                  
...:     finally:             
...:         c.close()   

Upvotes: 0

Views: 1844

Answers (1)

Edenhill
Edenhill

Reputation: 3113

What you describe is the default behaviour, namely of auto commit being enabled (enable.auto.commit=true). The last consumed message's offset will be committed at frequent intervals (auto.commit.interval.ms).

When properly shutting down the consumer (by calling close()) it will commit its final offsets. Upon restarting the client it will pick up from where it left off and consume messages newer than the last consumed one.

Upvotes: 1

Related Questions