music bot
music bot

Reputation: 11

Spring Kafka consumer keep disconnecting from broker at random time intervals

My kafka configuration is simple as it can be:

@Bean
public NewTopic generalTopic() {
    return TopicBuilder.name("topic")
            .partitions(5)
            .replicas(5)
            .build();
}

@KafkaListener(id= "anyID", topics="topic")
public void consumer(String message) {
    System.out.println(message);
}

Producer: sending messages invoking kafkaTemplate.send():

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

kafkaTemplate.send("topic", "message to send");

And finnally, kafka configuration:

spring.kafka.bootstrap-servers=localhost

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer= org.apache.kafka.common.serialization.StringSerializer

My application is a desktop application, so every client has it's own producer and consumer with this very same configuration above (excluding @KafkaListener(id= "anyID", where anyID is the user name).

When launching the application (right after closing it), sometimes it generates this error message:

2022-10-21 15:13:42.130 ERROR 3524 --- [JavaFX-Launcher] o.springframework.kafka.core.KafkaAdmin  : Could not configure topics

org.springframework.kafka.KafkaException: Timed out waiting to get existing topics; nested exception is java.util.concurrent.TimeoutException

Checking the kafka server.log (under logs/server.log), there's this stack trace:

    [2022-10-21 18:11:28,519] WARN Unexpected exception (org.apache.zookeeper.server.NIOServerCnxn)
EndOfStreamException: Unable to read additional data from client, it probably closed the socket: address = /127.0.0.1:34456, session = 0x100a5e36a580005
    at org.apache.zookeeper.server.NIOServerCnxn.handleFailedRead(NIOServerCnxn.java:163)
    at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:326)
    at org.apache.zookeeper.server.NIOServerCnxnFactory$IOWorkRequest.doWork(NIOServerCnxnFactory.java:522)
    at org.apache.zookeeper.server.WorkerService$ScheduledWorkRequest.run(WorkerService.java:154)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
[2022-10-21 18:11:43,670] INFO Expiring session 0x100a5e36a580005, timeout of 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)

I'm sure the error message is related to the log stack trace, but I have no idea what's is causing it and how to solve it.

In order to get it working back, I just restart kafka with

bin/kafka-server-start.sh -daemon config/server.properties

So basically I have to keep restarting kafka everytime I get this error.

Any thoughts?

For reference, here's a full reference of the error I'm getting in the clients:

    org.springframework.kafka.KafkaException: Timed out waiting to get existing topics; nested exception is java.util.concurrent.TimeoutException
    at org.springframework.kafka.core.KafkaAdmin.lambda$checkPartitions$8(KafkaAdmin.java:388) ~[spring-kafka-2.8.8.jar:2.8.8]
    at java.base/java.util.HashMap.forEach(HashMap.java:1421) ~[na:na]
    at org.springframework.kafka.core.KafkaAdmin.checkPartitions(KafkaAdmin.java:367) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.core.KafkaAdmin.addOrModifyTopicsIfNeeded(KafkaAdmin.java:263) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.core.KafkaAdmin.initialize(KafkaAdmin.java:200) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.core.KafkaAdmin.afterSingletonsInstantiated(KafkaAdmin.java:167) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:974) ~[spring-beans-5.3.22.jar:5.3.22]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918) ~[spring-context-5.3.22.jar:5.3.22]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583) ~[spring-context-5.3.22.jar:5.3.22]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:734) ~[spring-boot-2.7.3.jar:2.7.3]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:408) ~[spring-boot-2.7.3.jar:2.7.3]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:308) ~[spring-boot-2.7.3.jar:2.7.3]
    at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:164) ~[spring-boot-2.7.3.jar:2.7.3]
    at com.nume.main.MirroredMain.springBootApplicationContext(MirroredMain.java:40) ~[classes/:na]
    at com.nume.main.MirroredMain.init(MirroredMain.java:61) ~[classes/:na]
    at javafx.graphics@19/com.sun.javafx.application.LauncherImpl.launchApplication1(LauncherImpl.java:825) ~[javafx.graphics.jar:na]
    at javafx.graphics@19/com.sun.javafx.application.LauncherImpl.lambda$launchApplication$2(LauncherImpl.java:196) ~[javafx.graphics.jar:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.util.concurrent.TimeoutException: null
    at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[na:na]
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) ~[kafka-clients-3.1.1.jar:na]
    at org.springframework.kafka.core.KafkaAdmin.lambda$checkPartitions$8(KafkaAdmin.java:370) ~[spring-kafka-2.8.8.jar:2.8.8]
    ... 17 common frames omitted

Upvotes: 0

Views: 2314

Answers (2)

OneCricketeer
OneCricketeer

Reputation: 191701

Your Spring client timeout is a red-herring.

Your Broker is failing to connect to Zookeeper, and crashes. Therefore Spring app will also crash. Restarts will only temporarily fix it, so you need to address why the broker is really disconnecting from Zookpeeper (do Zookepeer logs look okay?)

Upvotes: 0

Nitish Sharma
Nitish Sharma

Reputation: 36

I think you forgot to mention the port in your configuration

Something like this in kafka configuration:

spring.kafka.bootstrap-servers=localhost:9092

Upvotes: 2

Related Questions