user2777473
user2777473

Reputation: 3826

spring cloud stream kafka binding config

Trying to develop a Spring Cloud app, using kafka

The configuration used for kafka is:

spring:
  application:
    name: service-sample

  cloud:
    stream:
      bindings:
        output: 
          destination: lrctms-cloud-dev
          content-type: application/json

      kafka:
        binder:
          brokers: 192.168.11.153
          defaultBrokerPort: 9092
          zkNodes: 192.168.11.153

Run the application, I can see these configs are pickedup

o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values:
      bootstrap.servers = [192.168.11.153:9092]
      client.id =
      connections.max.idle.ms = 300000
      metadata.max.age.ms = 300000
      metric.reporters = []
      metrics.num.samples = 2
      metrics.recording.level = INFO
      metrics.sample.window.ms = 30000
      receive.buffer.bytes = 65536
      reconnect.backoff.max.ms = 1000
      reconnect.backoff.ms = 50
      request.timeout.ms = 120000
      retries = 5
      retry.backoff.ms = 100
      sasl.jaas.config = null
      sasl.kerberos.kinit.cmd = /usr/bin/kinit
      sasl.kerberos.min.time.before.relogin = 60000
      sasl.kerberos.service.name = null
      sasl.kerberos.ticket.renew.jitter = 0.05
      sasl.kerberos.ticket.renew.window.factor = 0.8
      sasl.mechanism = GSSAPI
      security.protocol = PLAINTEXT
      send.buffer.bytes = 131072
      ssl.cipher.suites = null
      ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
      ssl.endpoint.identification.algorithm = null
      ssl.key.password = null
      ssl.keymanager.algorithm = SunX509
      ssl.keystore.location = null
      ssl.keystore.password = null
      ssl.keystore.type = JKS
      ssl.protocol = TLS
      ssl.provider = null
      ssl.secure.random.implementation = null
      ssl.trustmanager.algorithm = PKIX
      ssl.truststore.location = null
      ssl.truststore.password = null
      ssl.truststore.type = JKS

Problem is the following error message:

adminclient-1] o.apache.kafka.common.network.Selector   : [AdminClient clientId=adminclient-1] Connection with 127.0.0.1 disconnected
 java.net.ConnectException: Connection refused
         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
         at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
         at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106)
         at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:458)
         at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
         at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1006)
         at java.lang.Thread.run(Thread.java:748)

How to configure this "AdminClient" and pass the correct host/ip information it? Checked the Spring Cloud Stream Kafka Binder Reference Guide, but could not find the answer.

Upvotes: 0

Views: 2205

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191743

Moving comment to answer

So, according to the logs, the right configuration was set, however, that is only the initial connection to the broker. The Kafka Controller, then is sending back to your client, the list of advertised.host.name / advertised.listeners for every broker in the cluster, which in most cases, needs to be configured to be the external address of the broker that can be resolved by external clients. If you are getting anything else (in your case 127.0.0.1) this needs to be the property to check

Upvotes: 1

Related Questions