Reputation: 43
I have a scenario where I have to read all the messages from compacted topic (topic 2) from beginning. I have to save all these messages in memory which will act as lookup/cache.
I have another topic (Topic 1) from which once messages arrive, I have to do some lookup from the cache we created above and process further.
How to make sure during startup, KafkaListener for Topic 1 does not start until KafkaListener for Topic 2 read all the messages loaded in the cache?
Upvotes: 1
Views: 646
Reputation: 1
I have a scenario to consume the messages from topic 1 first and once all consumed from topic 1 then consume topic 2 messages. I used ContainerGroupSequencer as you mentioned in above comments.
As expected group 1 is stopped after receiving all messages and moved to group 2. But in my case I don't want to stop group1 after reading events. Just looking to read messages from topic 1 first and then topic2.
Upvotes: 0
Reputation: 174769
There is a new feature in 2.7.3.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#sequencing
A common use case is to start a listener after another listener has consumed all the records in a topic. For example, you may want to load the contents of one or more compacted topics into memory before processing records from other topics. Starting with version 2.7.3, a new component
ContainerGroupSequencer
has been introduced. It uses the @KafkaListener containerGroup property to group containers together and start the containers in the next group, when all the containers in the current group have gone idle.
It is best illustrated with an example.
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
Here, we have 4 listeners in two groups, g1 and g2.
During application context initialization, the sequencer, sets the
autoStartup
property of all the containers in the provided groups to false. It also sets theidleEventInterval
for any containers (that do not already have one set) to the supplied value (5000ms in this case). Then, when the sequencer is started by the application context, the containers in the first group are started. AsListenerContainerIdleEvent
s are received, each individual child container in each container is stopped. When all child containers in aConcurrentMessageListenerContainer
are stopped, the parent container is stopped. When all containers in a group have been stopped, the containers in the next group are started. There is no limit to the number of groups or containers in a group.
By default, the containers in the final group (g2 above) are not stopped when they go idle. To modify that behavior, set stopLastGroupWhenIdle to true on the sequencer.
With earlier versions, you have to implement the sequencing yourself; see this answer.
Upvotes: 1