Reputation: 506
I am having a small problem and that is that I have to use kafka as a notification system to start flows of my application, everything is fine, but I would like to know if I can alter the order when there are unread messages in the queue.
Let me explain: When there are two or more messages in the topic, and the consumer reads them, read first, the last one that has entered, and I want it to read first, the first one that entered.
So I wanted to know if it is possible and if it is, that we have to change for it.
I am using the latest version of Spring Apache, and Kafka 2.5.0
ConsumerConfig:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
private static final String BOOTSTRAP_ADDRESS = "";
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
KafkaConsumer:
@Component
public class KafkaConsumer {
@KafkaListener(topics = "Great_Topic")
private void listen(String msg) {
System.out.println("I've received: " + msg);
}
}
Example:
In my producer I have sent the following messages in this order:
Output in consumer:
I want the output to have the same order as the input.
Upvotes: 5
Views: 17658
Reputation: 31
Maybe to complement the answer made by Robin Moffatt
.
When you have multiples partitions you need to make sure that your producers force the FIFO
policy using key message
of each record. For example:
you have two partitions:
If you try to send the message below without any key:
Hello World 1
Hello World 2
Kafka can deal with this way:
Hello World 1 (can be stored in partition 1)
Hello World 2 (can be stored in partition 2)
Now if you consume those messages you can not guarantee FIFO
to this group of messages.
As Robin Moffatt
mentioned the FIFO
will be applied in each partition.
To make sure this FIFO
policy to the same type of records your can set in the producer side the KEY
and assign to those similar messages the same key. For Example:
If you try to send the message below ADDING
a key. (Normally Kafka producers request contain the topicName
, key
and message
). Example:
topic= Great_Topic key= messageKey message: Hello World 1
topic= Great_Topic key= messageKey message: Hello World 2
Now Kafka will guarantee that all your messages with the same key flows to the same partition.
Like This
Hello World 1 (will can be stored in the partition 1)
Hello World 2 (will can be stored in the partition 1)
In this case partition 2 will be empty or like this
Hello World 1 (will can be stored in the partition 2)
Hello World 2 (will can be stored in the partition 2)
In this case partition 1 will be empty.
Now if you run your Kafka consumer making sure that your are not consuming in an async way your application will behave as FIFO
Upvotes: 3
Reputation: 32070
When you consume from a topic you can opt to read from the beginning, and when you do so you get all the of the messages in the order in which they were produced into the partition.
If you have a single partition you will get strict FIFO behaviour.
If you have multiple partitions (as it looks like you have here) then you get FIFO within a partition, but across partitions it will be non-deterministic. Thus you could get the behaviour you show.
Upvotes: 18