Reputation: 1247
I am using confluent python kafka library to consume messages from kafka. Is there a way to start consuming messages from the last consumed one ? Say that I have 10 messages in a topic: * I consume those 10 messages After a while I have a couple more message written to that topic. I would like to consume those messages, but first hand skip over those first 10 messages and so on. Is it possible ?
I've tried setting 'auto.offset.reset' to 'largest', but that will make me consume from that point on. It will not consume 'unread' messages.
Meaning, if I write 10 messages in a topic and then try to consume them with that setting activated, it will listen only for incoming (new) messages.
This is the function for polling messages:
In [50]: def get_messages(topics=None):
...: try:
...: c = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'xyz_group', 'default.topic.config': {'enable.auto.commit': 'true'}})
...: if topics is not None:
...: c.subscribe(topics)
...: except KafkaException as e:
...: print('We got an exception: {}'.format(e))
...: else:
...: running = True
...: while running:
...: msg = c.poll()
...: if not msg.error():
...: msg_payload = msg.value().decode('utf-8')
...: print('Received: {}'.format(msg_payload))
...: msg_data = json.loads(msg_payload)
...: for k, v in msg_data.iteritems():
...: if k == 'signal' and v == 'stop':
...: running = False
...: #raise SystemExit('we got stop signal !')
...: else:
...: print('continue listening ...')
...: elif msg.error().code() != KafkaError._PARTITION_EOF:
...: print(msg.error())
...: running = False
...: finally:
...: c.close()
Upvotes: 0
Views: 1844
Reputation: 3113
What you describe is the default behaviour, namely of auto commit being enabled (enable.auto.commit=true
).
The last consumed message's offset will be committed at frequent intervals (auto.commit.interval.ms
).
When properly shutting down the consumer (by calling close()
) it will commit its final offsets.
Upon restarting the client it will pick up from where it left off and consume messages newer than the last consumed one.
Upvotes: 1