AlpacaMan
AlpacaMan

Reputation: 514

Spring cloud stream Kafka Failed to create producer binding

I have a spring-boot project that has dependencies of spring-cloud-starter-stream-kafka and spring-cloud-stream of version 2.1.2. It acts as a producer. When it start, it keeps warning me:

2019-05-06 15:00:25.136 WARN 28116 --- [| adminclient-2] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-2] Connection to node -1 could not be established. Broker may not be available.

and throw an error:

2019-05-06 15:01:10.280 ERROR 28116 --- [ask-scheduler-1] o.s.cloud.stream.binding.BindingService  : Failed to create producer binding; retrying in 30 seconds

org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception; nested exception is java.util.concurrent.TimeoutException
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:290) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:137) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:78) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:193) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:97) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:151) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleProducerBinding$2(BindingService.java:290) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_201]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_201]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_201]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_201]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_201]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_201]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_201]
Caused by: java.util.concurrent.TimeoutException: null
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274) ~[kafka-clients-2.0.1.jar:na]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:323) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:299) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:281) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    ... 14 common frames omitted

This is my bootstrap class:

@EnableEurekaClient
@SpringBootApplication
@EnableHystrixDashboard
@EnableCircuitBreaker
@EnableBinding(Channels.class)
public class Service1Application {

    public static void main(String[] args) {
        SpringApplication.run(Service1Application.class, args);
    }

}

This is the Channels class :

public interface Channels {
    String outputChannel = "myOutputChannel";

    @Output(outputChannel)
    MessageChannel myOutputChannel();
}

This is application.properties:

eureka.instance.prefer-ip-address=true
eureka.client.register-with-eureka=true
eureka.client.fetch-registry=true
eureka.client.service-url.defaultZone=http://localhost:8761/eureka/
spring.application.name=service1
server.port=8081

spring.cloud.stream.bindings.myOutputChannel.destination=myTopic
spring.cloud.stream.bindings.myOutputChannel.content-type=application/json

Upvotes: 1

Views: 19420

Answers (1)

Dina Bogdan
Dina Bogdan

Reputation: 4718

This is because you are not running Kafka on the local machine. If you want to specify another Kafka Brokers property you can do it in the application.properties like this:

spring.cloud.stream.kafka.binder.brokers=host1,host2
spring.cloud.stream.kafka.binder.defaultBrokerPort=port

The default values for the above two properties are localhost and 9092 and if you are not running a Kafka Cluster on localhost:9092 your application will fail with that error.

Upvotes: 4

Related Questions