Reputation: 720
Just right after creating kafka.Consumer
instance, I want to adjust the offsets with following code, decrease all assigned partitions offset(for specific consumer member) and ensure the previous messages are processed. The offsets are updated after committing, however, the ReadMessage
face timeout error when ReadMessage
is called.
import "github.com/confluentinc/confluent-kafka-go/kafka"
func AdjustConsumerOffset(c *kafka.Consumer) error {
const TimeOut = 100
balancing := true
for balancing {
c.Poll(TimeOut)
assignments, err := c.Assignment()
if err != nil {
return err
}
if len(assignments) > 0 {
balancing = false
}
}
// get consumer assigned partitions
assignments, err := c.Assignment()
if err != nil {
return err
}
assignments, err = c.Committed(assignments, TimeOut)
if err != nil {
return err
}
var decreasePartitions []kafka.TopicPartition
partitionsMap := make(map[int32]kafka.TopicPartition)
// decrease partitions offset by one if the offset is more than zero.
for _, partition := range assignments {
if partition.Offset > 0 {
partition.Offset--
partitionsMap[partition.Partition] = partition
}
decreasePartitions = append(decreasePartitions, partition)
}
// commit new offsets
if _, err := c.CommitOffsets(decreasePartitions); err != nil {
return err
}
for len(partitionsMap) > 0 {
msg, err := c.ReadMessage(TimeOut * time.Second)
if err != nil {
if err.(kafka.Error).Code() == kafka.ErrTimedOut { //nolint
break
}
return err
}
// the extra implementation doesn't matter and is not important because even there is message in the queue, it faced `timeout`
}
when produce 2 messages and consume them the offsets are as below:
after calling adjustment function and committing new offsets, the offsets are updated as below and the code break for loop when face timeout error:
my problem is even though the offsets make sense and there is at least one message which is not processed(based on the lag-offset) the ReadMessage
couldn't fetch it. Any clue is appreciated.
Upvotes: 2
Views: 866