Maryam
Maryam

Reputation: 720

kafka `ReadMessage` face timeout even though there is message in the queue

Just right after creating kafka.Consumer instance, I want to adjust the offsets with following code, decrease all assigned partitions offset(for specific consumer member) and ensure the previous messages are processed. The offsets are updated after committing, however, the ReadMessage face timeout error when ReadMessage is called.

import "github.com/confluentinc/confluent-kafka-go/kafka"

func AdjustConsumerOffset(c *kafka.Consumer) error { 
    const TimeOut = 100

    balancing := true

    for balancing {
        c.Poll(TimeOut)

        assignments, err := c.Assignment()
        if err != nil {
            return err
        }

        if len(assignments) > 0 {
            balancing = false
        }
    }
    // get consumer assigned partitions
    assignments, err := c.Assignment()
    if err != nil {
        return err
    }

    assignments, err = c.Committed(assignments, TimeOut)
    if err != nil {
        return err
    }

    var decreasePartitions []kafka.TopicPartition 

    partitionsMap := make(map[int32]kafka.TopicPartition)
    
    // decrease partitions offset by one if the offset is more than zero.
    for _, partition := range assignments {
        if partition.Offset > 0 {
            partition.Offset--
            partitionsMap[partition.Partition] = partition
        }

        decreasePartitions = append(decreasePartitions, partition)
    }
    
    // commit new offsets
    if _, err := c.CommitOffsets(decreasePartitions); err != nil {
        return err
    }

    for len(partitionsMap) > 0 {
        msg, err := c.ReadMessage(TimeOut * time.Second)
        if err != nil {
            if err.(kafka.Error).Code() == kafka.ErrTimedOut { //nolint
                break
            }

            return err
        }
        // the extra implementation doesn't matter and is not important because even there is message in the queue, it faced `timeout`
    }

when produce 2 messages and consume them the offsets are as below:

enter image description here

after calling adjustment function and committing new offsets, the offsets are updated as below and the code break for loop when face timeout error: enter image description here my problem is even though the offsets make sense and there is at least one message which is not processed(based on the lag-offset) the ReadMessage couldn't fetch it. Any clue is appreciated.

Upvotes: 2

Views: 866

Answers (0)

Related Questions