Reputation: 1410
Now that Golang Kafka library (sarama) is providing consumer group capability without any external library help with kafka 10. How can I get the current message offset being processed by a consumer group at any given time ?
Previously I used kazoo-go (https://github.com/wvanbergen/kazoo-go) to get my consumer group message offset as it is stored in Zookeeper. Now I use sarama-cluster (https://github.com/bsm/sarama-cluster), I am not sure which API to use to get my consumer group message offset.
Upvotes: 4
Views: 7856
Reputation: 4914
I've just been doing work on this myself. As @boris-burkov mentioned you don't have access to the getPOM method, however, you can create a POM yourself and called NextOffset() to get the current consumer's actual offset:
offsetManager, _ := sarama.NewOffsetManagerFromClient(clientName, cl.Client)
offsetPartitionManager, _ := offsetManager.ManagePartition("test-topic", 0)
offsetPartitionManager.NextOffset()
Upvotes: 0
Reputation: 168
Here's a sample code to get the consumer group offset (i.e. the offset where the consumer group will start):
package main
import (
"context"
"log"
"strings"
"github.com/Shopify/sarama"
)
func main() {
groupName := "testgrp"
topic := "topic_name"
offset, e := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
if e != nil {
log.Fatal(e)
}
log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
}
type gcInfo struct {
offset int64
}
func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
return nil
}
func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
g.offset = claim.InitialOffset()
return nil
}
func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
config := sarama.NewConfig()
config.Consumer.Offsets.AutoCommit.Enable = false // we're not going to update the consumer group offsets
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
if err != nil {
return 0, err
}
info := gcInfo{}
if err := client.Consume(ctx, []string{topic}, &info); err != nil {
return 0, err
}
return info.offset, nil
}
Upvotes: 2
Reputation: 14506
Under the hood the consumerGroupSession
struct is using PartitionOffsetManager
to get next offset:
if pom := s.offsets.findPOM(topic, partition); pom != nil {
offset, _ = pom.NextOffset()
}
Here is the documentation of pom.NextOffset().
When a consumerGroupSession
constructs a consumerGroupClaim
struct via newConsumerGroupClaim()
method, it passes offset, returned by pom.NextOffset()
, as offset
argument. You can access it later via claim.InitialOffset()
. After you started consuming messages, you can use message.Offset
of the currently processed message.
Unfortunately, consumerGroupSession.offsets.findPOM()
can't be accessed from ConsumerGroupHandler.ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim)
method, because it receives session as a ConsumerGroupSession
interface, not as consumerGroupSession
struct. So the offsets variable is private and not accessible.
Thus we can't really access NextOffset()
method, which does precisely what the OP wants.
Upvotes: 1
Reputation: 149
I am also working with Sarama and Kafka to get offset of a topic.
You can get offset with following code.
package main
import (
"gopkg.in/Shopify/sarama"
"fmt"
)
func main(){
client , err := sarama.Client([]string{"localhost:9092"},nil) // I am not giving any configuration
if err != nil {
panic(err)
}
lastoffset, err := client.GetOffset("topic-test",0,sarama.OffsetNewest)
if err != nil {
panic(err)
}
fmt.Println("Last Commited Offset ",lastoffset)
}
Let me know if this is the answer you are looking for and if it is helpful.
Upvotes: 0