Reputation: 115
The task is to prevent multiple instances of an application from writing duplicate messages to Kafka. The implementation language is Golang. I saw a solution involving the use of a producer group, but I couldn't find such a feature, only for consumers (which is odd). Can you advise on how to implement this?
Upvotes: -1
Views: 33
Reputation: 1
Actually, there isn't a direct "producer group" feature in Kafka, but we can implement exactly once semantics in Go. The key is using Kafka transactions combined with an idempotent producer. The working example:
import (
"github.com/Shopify/sarama"
"github.com/google/uuid"
)
type Producer struct {
producer sarama.SyncProducer
topic string
}
func NewProducer(brokers []string, topic string) (*Producer, error) {
config := sarama.NewConfig()
// Enable idempotent producer
config.Producer.Idempotent = true
config.Producer.RequiredAcks = sarama.WaitForAll
config.Net.MaxOpenRequests = 1 // Required for idempotence
// Enable transactions
config.Producer.Transaction.ID = uuid.New().String()
config.Producer.Transaction.Retry.Max = 3
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
return nil, err
}
return &Producer{
producer: producer,
topic: topic,
}, nil
}
func (p *Producer) SendMessage(key string, value []byte) error {
// Begin transaction
err := p.producer.BeginTxn()
if err != nil {
return err
}
msg := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(value),
}
_, _, err = p.producer.SendMessage(msg)
if err != nil {
p.producer.AbortTxn()
return err
}
// Commit transaction
return p.producer.CommitTxn()
}
Upvotes: 1