B3ns44d
B3ns44d

Reputation: 886

Issues with KafkaSource Adapter in Knative Eventing: Claims Update Timeout and Packet Decoding Errors

I'm currently facing challenges with my Knative Eventing setup, specifically regarding the KafkaSource adapter. It's been displaying error messages related to claims update timeouts and packet length decoding issues. detailed error messages from the KafkaSource Adapter logs are as follows:

The current Knative release versions being used are:

Upon investigating further, I have pinpointed potential sources of the errors in the Knative codebase. For the claims update error, the related code can be found in consumer/consumer_handler.go, and the relevant code from the Knative control-protocol is in pkg/service/service.go.

Here is the code from consumer/consumer_handler.go:

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *SaramaConsumerHandler) Setup(session sarama.ConsumerGroupSession) error {
    consumer.logger.Info("setting up handler")
    consumer.lifecycleListener.Setup(session)
    return nil
}

func (a *Adapter) Setup(sess sarama.ConsumerGroupSession) {
    if a.controlServer != nil {
        if err := a.controlServer.SendAndWaitForAck(kafkasourcecontrol.NotifySetupClaimsOpCode, kafkasourcecontrol.Claims(sess.Claims())); err != nil {
            a.logger.Warnf("Cannot send the claims update: %v", err)
        }
    }

    // Preemptively initialize consumer group offsets to be able to mark the source as ready
    // as soon as possible.
    if err := a.InitOffsets(sess); err != nil {
        a.logger.Warnf("Cannot initialized consumer group offsets: %v", err)
    }
}

And the code from pkg/service/service.go:

func (c *service) SendAndWaitForAck(opcode ctrl.OpCode, payload encoding.BinaryMarshaler) error {
    b, err := payload.MarshalBinary()
    if err != nil {
        return err
    }
    return c.sendBinaryAndWaitForAck(opcode, b)
}

func (c *service) sendBinaryAndWaitForAck(opcode ctrl.OpCode, payload []byte) error {
    if opcode == ctrl.AckOpCode {
        return fmt.Errorf("you cannot send an ack manually")
    }
    msg := ctrl.NewOutboundMessage(uint8(opcode), payload)

    logging.FromContext(c.ctx).Debugf("Going to send message with opcode %d and uuid %s", msg.OpCode(), msg.UUID().String())

    // Register the ack between the waiting acks
    ackCh := make(chan interface{}, 1)
    c.waitingAcksMutex.Lock()
    c.waitingAcks[msg.UUID()] = ackCh
    c.waitingAcksMutex.Unlock()

    defer func() {
        c.waitingAcksMutex.Lock()
        delete(c.waitingAcks, msg.UUID())
        c.waitingAcksMutex.Unlock()
    }()

    c.connection.OutboundMessages() <- &msg

    select {
    case <-ackCh:
        return nil
    case <-c.ctx.Done():
        logging.FromContext(c.ctx).Warnf("Dropping message because context cancelled: %s", msg.UUID().String())
        return c.ctx.Err()
    case <-time.After(controlServiceSendTimeout):
        logging.FromContext(c.ctx).Debugf("Timeout waiting for the ack: %s", msg.UUID().String())
        return fmt.Errorf("timeout exceeded for outgoing message: %s", msg.UUID().String())
    }
}

to further investigate and isolate the issue, I created a standalone Go program that implements a Kafka consumer using the Sarama library. The goal was to simulate the behavior of the KafkaSource adapter and observe if any similar issues would arise.

This program was set to consume messages from the same Kafka topic that was causing issues in the original setup. The program was then deployed in a pod within the same environment as the KafkaSource.

The code implemented a Kafka consumer, which consumed messages from a Kafka topic using a consumer group. This is the same mechanism used by KafkaSource. The significant aspect here is that the consumer was run within the same environmental conditions and configurations as the KafkaSource to keep the conditions as similar as possible.

Here's the code I used:

main.go:

package main

import (
    "context"
    "crypto/tls"
    "crypto/x509"
    "flag"
    "fmt"
    "log"
    "os"
    "os/signal"
    "strings"
    "sync"
    "syscall"

    "github.com/Shopify/sarama"
)

func init() {
    sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
}

var (
    brokers       = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
    userName      = flag.String("username", "", "The SASL username")
    passwd        = flag.String("passwd", "", "The SASL password")
    algorithm     = flag.String("algorithm", "", "The SASL SCRAM SHA algorithm sha256 or sha512 as mechanism")
    topic         = flag.String("topic", "default_topic", "The Kafka topic to use")
    certFile      = flag.String("certificate", "", "The optional certificate file for client authentication")
    keyFile       = flag.String("key", "", "The optional key file for client authentication")
    caFile        = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
    tlsSkipVerify = flag.Bool("tls-skip-verify", true, "Whether to skip TLS server cert verification")
    useTLS        = flag.Bool("tls", true, "Use TLS to communicate with the cluster")
    mode          = flag.String("mode", "consume", "Mode to run in: \"produce\" to produce, \"consume\" to consume")
    logMsg        = flag.Bool("logmsg", true, "True to log consumed messages to console")

    logger = log.New(os.Stdout, "[Producer] ", log.LstdFlags)
)

func createTLSConfiguration() (t *tls.Config) {
    t = &tls.Config{
        InsecureSkipVerify: *tlsSkipVerify,
    }
    if *certFile != "" && *keyFile != "" && *caFile != "" {
        cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
        if err != nil {
            log.Fatal(err)
        }

        caCert, err := os.ReadFile(*caFile)
        if err != nil {
            log.Fatal(err)
        }

        caCertPool := x509.NewCertPool()
        caCertPool.AppendCertsFromPEM(caCert)

        t = &tls.Config{
            Certificates:       []tls.Certificate{cert},
            RootCAs:            caCertPool,
            InsecureSkipVerify: *tlsSkipVerify,
        }
    }
    return t
}

func main() {
    flag.Parse()

    if *brokers == "" {
        log.Fatalln("at least one broker is required")
    }
    splitBrokers := strings.Split(*brokers, ",")

    if *userName == "" {
        log.Fatalln("SASL username is required")
    }

    if *passwd == "" {
        log.Fatalln("SASL password is required")
    }

    conf := sarama.NewConfig()
    conf.Producer.Retry.Max = 1
    conf.Producer.RequiredAcks = sarama.WaitForAll
    conf.Producer.Return.Successes = true
    conf.Metadata.Full = true
    conf.Version = sarama.V0_10_2_0
    conf.ClientID = "sasl_scram_client"
    conf.Metadata.Full = true
    conf.Net.SASL.Enable = true
    conf.Net.SASL.User = *userName
    conf.Net.SASL.Password = *passwd
    conf.Net.SASL.Handshake = true
    if *algorithm == "sha512" {
        conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
        conf.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
    } else if *algorithm == "sha256" {
        conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
        conf.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256

    } else {
        log.Fatalf("invalid SHA algorithm \"%s\": can be either \"sha256\" or \"sha512\"", *algorithm)
    }

    if *useTLS {
        conf.Net.TLS.Enable = true
        conf.Net.TLS.Config = createTLSConfiguration()
    }

    client, err := sarama.NewClient(splitBrokers, conf)
    if err != nil {
        log.Fatalf("unable to create kafka client: %q\n", err)
    }
    fmt.Print("client created", client)

    ctx, cancel := context.WithCancel(context.Background())
    consumerClient, err := sarama.NewClient(splitBrokers, conf)
    if err != nil {
        log.Panicf("Error creating consumer client: %v", err)
    }
    defer func(consumerClient sarama.Client) {
        err := consumerClient.Close()
        if err != nil {
            print(err)
        }
    }(consumerClient)

    consumerGroup, err := sarama.NewConsumerGroupFromClient("knative-consumer-group-name", consumerClient)
    if err != nil {
        log.Panicf("Error creating consumer group client: %v", err)
    }

    consumer := Consumer{
        ready: make(chan bool),
    }
    wg := &sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            // `Consume` should be called inside an infinite loop, when a
            // server-side rebalance happens, the consumer session will need to be
            // recreated to get the new claims
            topics := strings.Split(*topic, ",")
            if err := consumerGroup.Consume(ctx, topics, &consumer); err != nil {
                log.Panicf("Error from consumer: %v", err)
            }
            // check if context was cancelled, signaling that the consumer should stop
            if ctx.Err() != nil {
                return
            }
            consumer.ready = make(chan bool)
        }
    }()

    <-consumer.ready // Await till the consumer has been set up
    log.Println("Sarama consumer up and running!...")

    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    select {
    case <-ctx.Done():
        log.Println("terminating: context cancelled")
    case <-sigterm:
        log.Println("terminating: via signal")
    }
    cancel()
    wg.Wait()
    if err = client.Close(); err != nil {
        log.Panicf("Error closing client: %v", err)
    }

    logger.Println("Bye now!")
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
    ready chan bool
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
    // Mark the consumer as ready
    close(consumer.ready)
    return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    // NOTE:
    // Do not move the code below to a goroutine.
    // The `ConsumeClaim` itself is called within a goroutine, see:
    // https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L29
    for {
        select {
        case message := <-claim.Messages():
            log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
            session.MarkMessage(message, "")

        // Should return when `session.Context()` is done.
        // If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
        // https://github.com/Shopify/sarama/issues/1192
        case <-session.Context().Done():
            return nil
        }
    }
}

scram_client.go:

package main

import (
    "crypto/sha256"
    "crypto/sha512"

    "github.com/xdg-go/scram"
)

var (
    SHA256 scram.HashGeneratorFcn = sha256.New
    SHA512 scram.HashGeneratorFcn = sha512.New
)

type XDGSCRAMClient struct {
    *scram.Client
    *scram.ClientConversation
    scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
    x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
    if err != nil {
        return err
    }
    x.ClientConversation = x.Client.NewConversation()
    return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
    response, err = x.ClientConversation.Step(challenge)
    return
}

func (x *XDGSCRAMClient) Done() bool {
    return x.ClientConversation.Done()
}

i've used the same sarama version that is being used in the kafkasource in my code

go 1.19

require (
    github.com/Shopify/sarama v1.30.1
    github.com/xdg-go/scram v1.0.2
)

surprisingly, the consumer program worked as expected without encountering any of the previous errors. It was successfully consuming messages from the Kafka topic, with no signs of claim update timeouts or issues decoding packet lengths.

Thanks in advance.

Upvotes: 1

Views: 266

Answers (0)

Related Questions