Reputation: 11
My kafka configuration is simple as it can be:
@Bean
public NewTopic generalTopic() {
return TopicBuilder.name("topic")
.partitions(5)
.replicas(5)
.build();
}
@KafkaListener(id= "anyID", topics="topic")
public void consumer(String message) {
System.out.println(message);
}
Producer: sending messages invoking kafkaTemplate.send():
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
kafkaTemplate.send("topic", "message to send");
And finnally, kafka configuration:
spring.kafka.bootstrap-servers=localhost
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer= org.apache.kafka.common.serialization.StringSerializer
My application is a desktop application, so every client has it's own producer and consumer with this very same configuration above (excluding @KafkaListener(id= "anyID", where anyID is the user name).
When launching the application (right after closing it), sometimes it generates this error message:
2022-10-21 15:13:42.130 ERROR 3524 --- [JavaFX-Launcher] o.springframework.kafka.core.KafkaAdmin : Could not configure topics
org.springframework.kafka.KafkaException: Timed out waiting to get existing topics; nested exception is java.util.concurrent.TimeoutException
Checking the kafka server.log (under logs/server.log), there's this stack trace:
[2022-10-21 18:11:28,519] WARN Unexpected exception (org.apache.zookeeper.server.NIOServerCnxn)
EndOfStreamException: Unable to read additional data from client, it probably closed the socket: address = /127.0.0.1:34456, session = 0x100a5e36a580005
at org.apache.zookeeper.server.NIOServerCnxn.handleFailedRead(NIOServerCnxn.java:163)
at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:326)
at org.apache.zookeeper.server.NIOServerCnxnFactory$IOWorkRequest.doWork(NIOServerCnxnFactory.java:522)
at org.apache.zookeeper.server.WorkerService$ScheduledWorkRequest.run(WorkerService.java:154)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[2022-10-21 18:11:43,670] INFO Expiring session 0x100a5e36a580005, timeout of 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
I'm sure the error message is related to the log stack trace, but I have no idea what's is causing it and how to solve it.
In order to get it working back, I just restart kafka with
bin/kafka-server-start.sh -daemon config/server.properties
So basically I have to keep restarting kafka everytime I get this error.
Any thoughts?
For reference, here's a full reference of the error I'm getting in the clients:
org.springframework.kafka.KafkaException: Timed out waiting to get existing topics; nested exception is java.util.concurrent.TimeoutException
at org.springframework.kafka.core.KafkaAdmin.lambda$checkPartitions$8(KafkaAdmin.java:388) ~[spring-kafka-2.8.8.jar:2.8.8]
at java.base/java.util.HashMap.forEach(HashMap.java:1421) ~[na:na]
at org.springframework.kafka.core.KafkaAdmin.checkPartitions(KafkaAdmin.java:367) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.core.KafkaAdmin.addOrModifyTopicsIfNeeded(KafkaAdmin.java:263) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.core.KafkaAdmin.initialize(KafkaAdmin.java:200) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.core.KafkaAdmin.afterSingletonsInstantiated(KafkaAdmin.java:167) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:974) ~[spring-beans-5.3.22.jar:5.3.22]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918) ~[spring-context-5.3.22.jar:5.3.22]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583) ~[spring-context-5.3.22.jar:5.3.22]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:734) ~[spring-boot-2.7.3.jar:2.7.3]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:408) ~[spring-boot-2.7.3.jar:2.7.3]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:308) ~[spring-boot-2.7.3.jar:2.7.3]
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:164) ~[spring-boot-2.7.3.jar:2.7.3]
at com.nume.main.MirroredMain.springBootApplicationContext(MirroredMain.java:40) ~[classes/:na]
at com.nume.main.MirroredMain.init(MirroredMain.java:61) ~[classes/:na]
at javafx.graphics@19/com.sun.javafx.application.LauncherImpl.launchApplication1(LauncherImpl.java:825) ~[javafx.graphics.jar:na]
at javafx.graphics@19/com.sun.javafx.application.LauncherImpl.lambda$launchApplication$2(LauncherImpl.java:196) ~[javafx.graphics.jar:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.util.concurrent.TimeoutException: null
at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[na:na]
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) ~[kafka-clients-3.1.1.jar:na]
at org.springframework.kafka.core.KafkaAdmin.lambda$checkPartitions$8(KafkaAdmin.java:370) ~[spring-kafka-2.8.8.jar:2.8.8]
... 17 common frames omitted
Upvotes: 0
Views: 2314
Reputation: 191701
Your Spring client timeout is a red-herring.
Your Broker is failing to connect to Zookeeper, and crashes. Therefore Spring app will also crash. Restarts will only temporarily fix it, so you need to address why the broker is really disconnecting from Zookpeeper (do Zookepeer logs look okay?)
Upvotes: 0
Reputation: 36
I think you forgot to mention the port in your configuration
Something like this in kafka configuration:
spring.kafka.bootstrap-servers=localhost:9092
Upvotes: 2