Reputation: 11
I have a multi client spring boot application which send and receive kafka streams between its clients (which essentially means the application has a consumer and a producer in it). The configuration is as simply as it can be:
Inside @SpringBootApplication class (could be placed in a @Configuration class as well, but I didn't felt the need to create a new class only for that bean purpose):
@Bean
public NewTopic generalTopic() {
return TopicBuilder.name("topic")
.partitions(10)
.replicas(10)
.build();
}
Kafka producer and consumer configuration classes? We don't do that here, instead KafkaTemplate is injected in the class which is going to send the message:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
To produce the message, just invoke method "send(K, V) from KafkaTemplate":
kafkaTemplate.send("topic", "Hello World!");
To consume the messages, a @KafkaListener is used:
@KafkaListener(topics="topic", groupId="topic")
public void consumer(String message) {
System.out.println(message);
}
The properties are in application.properties:
spring.kafka.bootstrap-servers=194.113.64.103:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer= org.apache.kafka.common.serialization.StringSerializer
Every single client runs all this code. The application is sending and consuming the messages, although, for some reason, sometimes the consumer receives the message, sometimes id does not (maybe the application sometimes does not send the messages? I doubt this one but who knows). The interval between the received and unreceived messages are minimal: it varies between 1 - 10 seconds. So let's say I send a message once in a second (message being equal to "1" to "10"). Sometimes I receive "1", "2", "6", "8", sometimes "4", "7", "8", "9". It seems to be completely random. Note that my server is running in another continent (U.S, all clients are located in South America).
Any thoughts?
P.S: I know placing my server IP is a huge security breach, although this is a temporary test server and nothing else than the kafka broker runs in there, so it is not a problem. I decided to keep the path in this post so everyone could test the described behavior.
Response to
"./kafka-consumer-groups.sh --describe 'topic' --bootstrap-server 194.113.64.103:9092 --all-groups"
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic topic 1 0 0 0 consumer-topic-1-5b2ea195-f747-4d53-a17a-53c20a768a5f /168.194.160.183 consumer-topic-1
topic topic 0 0 0 0 consumer-topic-1-5b2ea195-f747-4d53-a17a-53c20a768a5f /168.194.160.183 consumer-topic-1
topic topic 4 1 1 0 consumer-topic-1-5b2ea195-f747-4d53-a17a-53c20a768a5f /168.194.160.183 consumer-topic-1
topic topic 3 2 2 0 consumer-topic-1-5b2ea195-f747-4d53-a17a-53c20a768a5f /168.194.160.183 consumer-topic-1
topic topic 2 0 0 0 consumer-topic-1-5b2ea195-f747-4d53-a17a-53c20a768a5f /168.194.160.183 consumer-topic-1
topic topic 7 1 1 0 consumer-topic-1-c724077c-e911-4d6c-bb1d-1cba17c26a02 /168.194.160.183 consumer-topic-1
topic topic 6 0 0 0 consumer-topic-1-c724077c-e911-4d6c-bb1d-1cba17c26a02 /168.194.160.183 consumer-topic-1
topic topic 5 0 0 0 consumer-topic-1-c724077c-e911-4d6c-bb1d-1cba17c26a02 /168.194.160.183 consumer-topic-1
topic topic 9 1 1 0 consumer-topic-1-c724077c-e911-4d6c-bb1d-1cba17c26a02 /168.194.160.183 consumer-topic-1
topic topic 8 1 1 0 consumer-topic-1-c724077c-e911-4d6c-bb1d-1cba17c26a02 /168.194.160.183 consumer-topic-1
[root@my-vps bin]#
You can test my application by placing it in your Main class:
@Bean
CommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {
return args -> {
for (int i = 0; i < 10; i++)
kafkaTemplate.send("topic", "Hello! " + i);
};
}
Upvotes: 1
Views: 6410
Reputation: 191854
From what you've shown in your code, everything is working as expected.
Your producer doesn't include a key, therefore events are round-robined across your 10 topic partitions. (sidenote: you don't need such a high replication factor). You are calling send(String topic, V value)
, not send(String topic, K key, V value)
Your @KafkaListener
has a hard-coded groupId
, so since that's not any dynamic property, and Kafka consumers in the same group cannot read the same partitions at the same time, then by running multiple consumers (as shown by your 2 consumer-ids
in the consumer group description), they individually will read different partitions. As mentioned above, in your producer, records are being distributed amongst various partitions.
You can see in the log output for the consumers which partitions they do get assigned, but you can also add a listener in Spring-Kafka for a partition onAssignment event to check this yourself.
maybe the application sometimes does not send the messages
That is correct. Kafka sends batches of data, by default, not one at a time. You need to manually flush a producer to guarantee all events will be sent.
Notice that the end-offset column doesn't add up to a factor of 10, so only about 5 events were actually sent.
Upvotes: 0