user16158841
user16158841

Reputation:

security.protocol error when setting up basic Kafka consumer and producer in Go?

I am attempting to set up a basic Kafka client in Go - following the examples detailed here https://docs.confluent.io/clients-confluent-kafka-go/current/overview.html#go-example-code and https://github.com/confluentinc/confluent-kafka-go.

I wrote the consumer and producer examples the same way they were given, like so

func Produce() {

    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "my-broker-name"})
    if err != nil {
        panic(err)
    }

    defer p.Close()

    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                }
            }
        }
    }()

    topic := "myTopic"
    for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
        p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          []byte(word),
        }, nil)
    }

    p.Flush(15 * 1000)
}

func Consume() {

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "my-broker-name",
        "group.id":          "myGroup",
        "auto.offset.reset": "earliest",
    })

    if err != nil {
        panic(err)
    }

    c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)

    for {
        msg, err := c.ReadMessage(-1)
        if err == nil {
            fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
        } else {
            
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
        }
    }
    
    c.Close()
}

(my-broker-name is a substitute for my hostname + port, which I didn't want to include here)

However when running the produce function it returns an error saying

Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 31ms in state APIVERSION_QUERY)

and when running the consume function I receive the same error, but also something that says

Consumer error: 1/1 brokers are down (<nil>)

which I am certain is not the case.

I'm unfortunately unable to find any documentation on what these errors mean, or how to approach fixing them. How do I resolve the error so that I'm able to produce and consume to my Broker?

UPDATE:

I obtained my certificate and converted it to a .pem file, and changed the ConfigMap to the following:

p, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "my-broker:32500",
        "security.protocol": "SSL",
        "ssl.certificate.location": "mycert.pem",
        "ssl.ca.location": "ca-chain.pem"})
    if err != nil {
        panic(err)
    }

However, this is now returning

client SSL authentication might be required (see ssl.key.location and ssl.certificate.location and consult the broker logs for more information)

Does this mean that there is a problem with the Certificate? Or is there a step that I am missing somewhere?

Upvotes: 4

Views: 15033

Answers (2)

user16158841
user16158841

Reputation:

The solution here was that I was missing ssl.key.location. I had to ask my administrator for the key. Once I included the key everything worked. The final configuration I had looked like the following:

c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "hostname:port-number",
        "security.protocol": "SSL",
        "ssl.ca.location": "ca-chain.pem",
        "ssl.key.location": "key-location",
        "ssl.certificate.location": "mycert.pem"})

    if err != nil {
        panic(err)
    }

Upvotes: 0

Ran Lupovich
Ran Lupovich

Reputation: 1831

You need to provide hostname and port as your bootstrap servers

  "bootstrap.servers": "host1:9092"

To connect to secured port in kafka you need to provide truststore configuration that contains your ca file, or any application for secured connection for that matter

https://www.google.com/amp/s/www.process-one.net/blog/using-tls-authentication-for-your-go-kafka-client/%3famp

https://github.com/FluuxIO/kafka/blob/master/examples/base-client/base-client.go#L6

    kafka.ConfigMap{
  "bootstrap.servers"̇: "..",
  "security.protocol": "SSL",
  // If you're using SSL authentication, provide the client's key here
  "ssl.key.location": "path-to-private-key.pem",
  "ssl.certificate.location": "path-to-public-key.pem",
  "ssl.key.password": "if any..",
}

For you new error look there

What does "SSL_CTX_use_PrivateKey_file" "problems getting password error" indicate in Nginx error log?

Upvotes: 1

Related Questions