pbathala
pbathala

Reputation: 1410

How to get consumer group offsets for partition in Golang Kafka 10

Now that Golang Kafka library (sarama) is providing consumer group capability without any external library help with kafka 10. How can I get the current message offset being processed by a consumer group at any given time ?

Previously I used kazoo-go (https://github.com/wvanbergen/kazoo-go) to get my consumer group message offset as it is stored in Zookeeper. Now I use sarama-cluster (https://github.com/bsm/sarama-cluster), I am not sure which API to use to get my consumer group message offset.

Upvotes: 4

Views: 7856

Answers (4)

Rambatino
Rambatino

Reputation: 4914

I've just been doing work on this myself. As @boris-burkov mentioned you don't have access to the getPOM method, however, you can create a POM yourself and called NextOffset() to get the current consumer's actual offset:

offsetManager, _ := sarama.NewOffsetManagerFromClient(clientName, cl.Client)
offsetPartitionManager, _ := offsetManager.ManagePartition("test-topic", 0)
offsetPartitionManager.NextOffset()

Upvotes: 0

Arsen
Arsen

Reputation: 168

Here's a sample code to get the consumer group offset (i.e. the offset where the consumer group will start):

package main
    
    import (
        "context"
        "log"
        "strings"
    
        "github.com/Shopify/sarama"
    )
    
    func main() {
        groupName := "testgrp"
        topic := "topic_name"
        offset, e := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
        if e != nil {
            log.Fatal(e)
        }
        log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
    }
    
    type gcInfo struct {
        offset int64
    }
    
    func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
        return nil
    }
    
    func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
        return nil
    }
    
    func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
        g.offset = claim.InitialOffset()
        return nil
    }
    
    func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
        config := sarama.NewConfig()
        config.Consumer.Offsets.AutoCommit.Enable = false // we're not going to update the consumer group offsets
        client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
        if err != nil {
            return 0, err
        }
        info := gcInfo{}
        if err := client.Consume(ctx, []string{topic}, &info); err != nil {
            return 0, err
        }
        return info.offset, nil
    }

Upvotes: 2

Boris Burkov
Boris Burkov

Reputation: 14506

Under the hood the consumerGroupSession struct is using PartitionOffsetManager to get next offset:

    if pom := s.offsets.findPOM(topic, partition); pom != nil {
        offset, _ = pom.NextOffset()
    }

Here is the documentation of pom.NextOffset().

When a consumerGroupSession constructs a consumerGroupClaim struct via newConsumerGroupClaim() method, it passes offset, returned by pom.NextOffset(), as offset argument. You can access it later via claim.InitialOffset(). After you started consuming messages, you can use message.Offset of the currently processed message.

Unfortunately, consumerGroupSession.offsets.findPOM() can't be accessed from ConsumerGroupHandler.ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) method, because it receives session as a ConsumerGroupSession interface, not as consumerGroupSession struct. So the offsets variable is private and not accessible.

Thus we can't really access NextOffset() method, which does precisely what the OP wants.

Upvotes: 1

Sarwesh Suman
Sarwesh Suman

Reputation: 149

I am also working with Sarama and Kafka to get offset of a topic.

You can get offset with following code.

    package main

    import (
     "gopkg.in/Shopify/sarama"
     "fmt"
    )

    func main(){
      client , err := sarama.Client([]string{"localhost:9092"},nil) // I am not giving any configuration
      if err != nil {
          panic(err)
      }
      lastoffset, err := client.GetOffset("topic-test",0,sarama.OffsetNewest)
      if err != nil {
          panic(err)
      }
      fmt.Println("Last Commited Offset ",lastoffset)
    }

Let me know if this is the answer you are looking for and if it is helpful.

Upvotes: 0

Related Questions