Mekan Allaberdiyev
Mekan Allaberdiyev

Reputation: 35

Kafka-Python consume last unread message

I have two microservices.

  1. MProducer - sending messages to kafka queue
  2. MConsumer - reading messages from kafka queue

When consumer crashes and restart, I want to continue consuming from last message.

consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                         auto_offset_reset='latest',
                         enable_auto_commit=False)

Upvotes: 1

Views: 1948

Answers (2)

Mickael Maison
Mickael Maison

Reputation: 26885

It looks like you are using kafka-python, so you'll need to pass the group_id argument to your Consumer. See the description for this argument in the KafkaConsumer documentation.

By setting a group id, the Consumer will periodically commit its position to Kafka and will automatically retrieve it upon restarting.

Upvotes: 2

Oras
Oras

Reputation: 1096

You do that by having a consumer group. Assuming you're using confluent library then just add 'group.id': 'your-group' When the service is down then coming up, it will start from last committed point.

The information about each consumer group is saved in a special topic in Kafka (starting from v0.9) called __consumer_offsets. More info in kafka docs [https://kafka.apache.org/intro#intro_consumers]

Upvotes: 0

Related Questions