Duke
Duke

Reputation: 11

Spring Kafka - Consumers don't receive message sometimes

I have a multi client spring boot application which send and receive kafka streams between its clients (which essentially means the application has a consumer and a producer in it). The configuration is as simply as it can be:

Inside @SpringBootApplication class (could be placed in a @Configuration class as well, but I didn't felt the need to create a new class only for that bean purpose):

@Bean
public NewTopic generalTopic() {
    return TopicBuilder.name("topic")
            .partitions(10)
            .replicas(10)
            .build();
}

Kafka producer and consumer configuration classes? We don't do that here, instead KafkaTemplate is injected in the class which is going to send the message:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

To produce the message, just invoke method "send(K, V) from KafkaTemplate":

kafkaTemplate.send("topic", "Hello World!");

To consume the messages, a @KafkaListener is used:

@KafkaListener(topics="topic", groupId="topic")
public void consumer(String message) {
    System.out.println(message);
}

The properties are in application.properties:

spring.kafka.bootstrap-servers=194.113.64.103:9092

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer= org.apache.kafka.common.serialization.StringSerializer

Every single client runs all this code. The application is sending and consuming the messages, although, for some reason, sometimes the consumer receives the message, sometimes id does not (maybe the application sometimes does not send the messages? I doubt this one but who knows). The interval between the received and unreceived messages are minimal: it varies between 1 - 10 seconds. So let's say I send a message once in a second (message being equal to "1" to "10"). Sometimes I receive "1", "2", "6", "8", sometimes "4", "7", "8", "9". It seems to be completely random. Note that my server is running in another continent (U.S, all clients are located in South America).

Any thoughts?

P.S: I know placing my server IP is a huge security breach, although this is a temporary test server and nothing else than the kafka broker runs in there, so it is not a problem. I decided to keep the path in this post so everyone could test the described behavior.

Response to

"./kafka-consumer-groups.sh --describe 'topic' --bootstrap-server 194.113.64.103:9092 --all-groups"

    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST             CLIENT-ID
topic           topic           1          0               0               0               consumer-topic-1-5b2ea195-f747-4d53-a17a-53c20a768a5f /168.194.160.183 consumer-topic-1
topic           topic           0          0               0               0               consumer-topic-1-5b2ea195-f747-4d53-a17a-53c20a768a5f /168.194.160.183 consumer-topic-1
topic           topic           4          1               1               0               consumer-topic-1-5b2ea195-f747-4d53-a17a-53c20a768a5f /168.194.160.183 consumer-topic-1
topic           topic           3          2               2               0               consumer-topic-1-5b2ea195-f747-4d53-a17a-53c20a768a5f /168.194.160.183 consumer-topic-1
topic           topic           2          0               0               0               consumer-topic-1-5b2ea195-f747-4d53-a17a-53c20a768a5f /168.194.160.183 consumer-topic-1
topic           topic           7          1               1               0               consumer-topic-1-c724077c-e911-4d6c-bb1d-1cba17c26a02 /168.194.160.183 consumer-topic-1
topic           topic           6          0               0               0               consumer-topic-1-c724077c-e911-4d6c-bb1d-1cba17c26a02 /168.194.160.183 consumer-topic-1
topic           topic           5          0               0               0               consumer-topic-1-c724077c-e911-4d6c-bb1d-1cba17c26a02 /168.194.160.183 consumer-topic-1
topic           topic           9          1               1               0               consumer-topic-1-c724077c-e911-4d6c-bb1d-1cba17c26a02 /168.194.160.183 consumer-topic-1
topic           topic           8          1               1               0               consumer-topic-1-c724077c-e911-4d6c-bb1d-1cba17c26a02 /168.194.160.183 consumer-topic-1
[root@my-vps bin]#

You can test my application by placing it in your Main class:

@Bean
CommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {
    return args -> {
        for (int i = 0; i < 10; i++)
            kafkaTemplate.send("topic", "Hello! " + i);
    };
}

Upvotes: 1

Views: 6410

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191854

From what you've shown in your code, everything is working as expected.

Your producer doesn't include a key, therefore events are round-robined across your 10 topic partitions. (sidenote: you don't need such a high replication factor). You are calling send(String topic, V value), not send(String topic, K key, V value)

Your @KafkaListener has a hard-coded groupId, so since that's not any dynamic property, and Kafka consumers in the same group cannot read the same partitions at the same time, then by running multiple consumers (as shown by your 2 consumer-ids in the consumer group description), they individually will read different partitions. As mentioned above, in your producer, records are being distributed amongst various partitions.

You can see in the log output for the consumers which partitions they do get assigned, but you can also add a listener in Spring-Kafka for a partition onAssignment event to check this yourself.

maybe the application sometimes does not send the messages

That is correct. Kafka sends batches of data, by default, not one at a time. You need to manually flush a producer to guarantee all events will be sent.

Notice that the end-offset column doesn't add up to a factor of 10, so only about 5 events were actually sent.

Upvotes: 0

Related Questions