Dikshant Adhikari
Dikshant Adhikari

Reputation: 662

Sarama cannot talk to the Kafka server

So I am trying to configure a Sarama (a native go client for kafka) producer client. I have configured my TLS accordingly making sure the client certs were generated using proper ciphers. My Go code to init the client looks like this:

import (
    "crypto/tls"
    "crypto/x509"
    "encoding/pem"
    "io/ioutil"
    "net"
    "path/filepath"

    "github.com/Shopify/sarama"
    log "github.com/sirupsen/logrus"
)

const (
    certFile = "client_ingestion_client.pem"
    keyFile  = "client_ingestion_client.key"
)

func InitKafkaClient(host string, port string, certPath string) (sarama.AsyncProducer, error) {

    cf := filepath.Join(certPath, certFile)
    kf := filepath.Join(certPath, keyFile)

    // Log cert and key path
    log.Debugln(cf)
    log.Debugln(kf)

    // Read the cert in
    certIn, err := ioutil.ReadFile(cf)

    if err != nil {
        log.Error("cannot read cert", err)
        return nil, err
    }

    // Read & decode the encrypted key file with the pass to make tls work
    keyIn, err := ioutil.ReadFile(kf)
    if err != nil {
        log.Error("cannot read key", err)
        return nil, err
    }

    // Decode and decrypt our PEM block as DER
    decodedPEM, _ := pem.Decode([]byte(keyIn))
    decrypedPemBlock, err := x509.DecryptPEMBlock(decodedPEM, []byte("m4d3ups3curity4k4fka?"))
    if err != nil {
        log.Error("cannot decrypt pem block", err)
        return nil, err
    }

    // Parse the DER encoded block as PEM
    rsaKey, err := x509.ParsePKCS1ParrivateKey(decrypedPemBlock)
    if err != nil {
       log.Error("failed to parse rsa as pem", err)
       return nil, err
    }

    // Marshal the pem encoded RSA key to bytes in memory
    pemdata := pem.EncodeToMemory(
       &pem.Block{
            Type:  "RSA PRIVATE KEY",
            Bytes: x509.MarshalPKCS1PrivateKey(rsaKey),
        },
    )
    if err != nil {
        log.Error("cannot marshal rsa as pem in memory", err)
        return nil, err
    }

    // Load our decrypted key pair
    crt, err := tls.X509KeyPair(certIn, pemdata)
    if err != nil {
        log.Error("cannot load key pair", err)
        return nil, err
    }
    config := sarama.NewConfig()
    config.Net.TLS.Enable = true
    config.Net.TLS.Config = &tls.Config{
        Certificates: []tls.Certificate{crt},
        CipherSuites: []uint16{
           tls.TLS_RSA_WITH_AES_128_GCM_SHA256,
        },
    }

    // Setting this allows us not to read from successes channel
    config.Producer.Return.Successes = false
    // Setting this allows us not to read from errors channel
    config.Producer.Return.Errors = false
    client, err := sarama.NewClient([]string{net.JoinHostPort(host, port)}, config)
    if err != nil {
        return nil, err
    }
    return sarama.NewAsyncProducerFromClient(client)
}

When I initialize the code I get an error saying:

time="2018-01-19T15:31:38Z" level=error msg="Error trying to setup kafka: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"

I have verified that the Kafka host is reachable and can be connected to. See below.

I verified that the key gets decrypted correctly by checking the output from the go code to the output generated from the openssl rsa -in client_ingestion_client.key -out decrypted.key command. I also made sure that the key was generated properly using keytool with the correct flags including -keylag RSA flag as suggested in here.

I also ran openssl s_client -connect $KAFKA_HOST:$KAFKA_PORT and got the following response

verify error:num=19:self signed certificate in certificate chain
139901934057376:error:1408E0F4:SSL routines:ssl3_get_message:unexpected message:s3_both.c:408:

The verify error is fine since I am using a self signed cert but I don't know what the error that follows is about. Maybe thats the cause of my problem?

Further I get the following information:

Requested Signature Algorithms: ECDSA+SHA512:RSA+SHA512:ECDSA+SHA384:RSA+SHA384:ECDSA+SHA256:RSA+SHA256:DSA+SHA256:ECDSA+SHA224:RSA+SHA224:DSA+SHA224:ECDSA+SHA1:RSA+SHA1:DSA+SHA1
Shared Requested Signature Algorithms: ECDSA+SHA512:RSA+SHA512:ECDSA+SHA384:RSA+SHA384:ECDSA+SHA256:RSA+SHA256:DSA+SHA256:ECDSA+SHA224:RSA+SHA224:DSA+SHA224:ECDSA+SHA1:RSA+SHA1:DSA+SHA1
    Peer signing digest: SHA512
    Server Temp Key: ECDH, P-256, 256 bits
    ---
    SSL handshake has read 4668 bytes and written 169 bytes
    ---
    New, TLSv1/SSLv3, Cipher is ECDHE-RSA-AES128-GCM-SHA256
    Server public key is 2048 bit
    Secure Renegotiation IS supported
    Compression: NONE
    Expansion: NONE
    No ALPN negotiated
    SSL-Session:
        Protocol  : TLSv1.2
        Cipher    : ECDHE-RSA-AES128-GCM-SHA256
        Session-ID: 5A6216C765EF33BC85FACE82B01BC506358F4D62C77817A1F7EEFB50941DAEC9
        Session-ID-ctx:
        Master-Key: F8641FBF63A0AC7AB2D6D941C421DCA44550448524DADF4F0A7943F7928E65D5773E60A45212A7F320B250595AA6737B
        Key-Arg   : None
        Krb5 Principal: None
        PSK identity: None
        PSK identity hint: None
        Start Time: 1516377799
        Timeout   : 300 (sec)
        Verify return code: 19 (self signed certificate in certificate chain)
    ---  

Since this cipher is referenced in the openssl conenction:

ECDHE-RSA-AES128-GCM-SHA256

I tried adding this tls.TLS_RSA_WITH_AES_128_GCM_SHA256 to my go code which seemed like a close match but I get the same error message in go with it saying that it has run out of available brokers to talk to.

Upvotes: 4

Views: 4726

Answers (2)

Garima Bathla
Garima Bathla

Reputation: 41

The correct approach is to set the tls.Config.ServerName in the Sarama client.

@Rob explains why in the comment below the accepted answer:

InsecureSkipVerify doesn't disable the hostname check. It literally does not check the certificate at all. Generally, you need to set tls.Config.ServerName to match the CN expected in the server cert you are connected to. This inane advice to disable cert checks is a widespread vulnerability in Go code that uses TLS. InsecureSkipVerify only has a few legitimate uses where the cert is actually validated with custom algorithm, or if this is only used to grab the server cert and disconnect immediately; generally to learn the CN in the server.

Upvotes: 1

Dikshant Adhikari
Dikshant Adhikari

Reputation: 662

So I found my issue. Turns out the subdomain for the kafka deployment had a self signed cert so I had to set InsecureSkipVerify: true in the config.Net.Tls.Config struct for the client. So the code looks like:

 config.Net.TLS.Config = &tls.Config{
    Certificates: []tls.Certificate{crt},
    InsecureSkipVerify: true,
}

There is no need to include the cipher-suite as well.

Upvotes: 2

Related Questions