Reputation: 462
I'm trying to test rebalancing consumers. I've read about consumer groups and they don't work as I expected. I have a two consumers running in the same group like so:
package main
import (
"errors"
"net"
"os"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/sirupsen/logrus"
)
type Callback func(*kafka.Message) error
type CallbackMap map[string]Callback
type Consumer struct {
consumer *kafka.Consumer
name string
callbackMap CallbackMap
}
func NewKafkaConsumer(topics []string, name string) (*Consumer, error) {
bootstrap := os.Getenv("KAFKA_HOSTNAME")
consumer, err := kafka.NewConsumer(
&kafka.ConfigMap{
"bootstrap.servers": bootstrap,
"group.id": "testing",
"security.protocol": "plaintext",
"go.events.channel.enable": true,
},
)
if err != nil {
logrus.WithError(err).Fatal("failed to create consumer")
return nil, err
}
if err := consumer.SubscribeTopics(topics, nil); err != nil {
logrus.WithError(err).Fatal("failed to subscribe to topics")
return nil, err
}
return &Consumer{
consumer: consumer,
name: name,
callbackMap: make(CallbackMap),
}, nil
}
func (c *Consumer) Subscribe(key string, callback Callback) {
c.callbackMap[key] = callback
}
func (c *Consumer) UnSubscribe(key string) {
delete(c.callbackMap, key)
}
func (c *Consumer) HandleMessage(message *kafka.Message, chano chan error) {
if message.TopicPartition.Error != nil {
logrus.WithError(message.TopicPartition.Error).Warning()
return
}
logrus.Errorf("CONSUMER NAME IS =========== %s", c.name)
logrus.Errorf("MESSAGE is ============: %s", message)
logrus.Errorf("VALUE IS =============: %s", string(message.Value))
logrus.Errorf("PARTITION IS =========== %d", message.TopicPartition.Partition)
if string(message.Value) == "20" {
logrus.Errorf("Exited consumer: %s", c.name)
chano <- errors.New("err")
return
}
}
func (c *Consumer) Consume() {
logrus.Errorf("entering consumer: %s", c.name)
chano := make(chan error)
for event := range c.consumer.Events() {
select {
case _ = <-chano:
err := c.consumer.Close()
if err != nil {
logrus.Errorf("Failed to close consumer")
return
}
return
default:
switch e := event.(type) {
case *kafka.Message:
c.HandleMessage(e, chano)
case kafka.Error:
logrus.WithError(e).Warning("an error occurred while reading from topic")
default:
// Ignore other event types
}
}
}
}
func main() {
kafkaConsumer1, err := NewKafkaConsumer([]string{"test_topic"}, "one")
kafkaConsumer2, err := NewKafkaConsumer([]string{"test_topic"}, "two")
logrus.Errorf("started")
if err != nil {
logrus.Errorf("err %s", err)
return
}
go kafkaConsumer1.Consume()
go kafkaConsumer2.Consume()
select {}
}
I am producing messages with sequential values. I expect that once the value hits 20, the consumer will stop and close and rebalancing will occur, with the other consumer starting to listen. This doesn't happen - once 20 hits, all consuming stops. Why is this happening? For example, consumer one started and the log I get is:
ERRO[0078] CONSUMER NAME IS =========== one
ERRO[0078] MESSAGE is ============: test_topic[3]@947
ERRO[0078] VALUE IS =============: 20
ERRO[0078] PARTITION IS =========== 3
ERRO[0078] Exited consumer: one
And that's it.
Thanks
Upvotes: 1
Views: 367