Reputation: 601
I have 2 Kafka topics A and B. I want that every time when my consumer starts up, it goes to topic A first. Topic A contains information about topic B, and I'll depend on that information to fetch data from topic B.
So essentially I need to read topic A first before going to topic B, and I just need to do this once upon every restart of my program.
Things I can think of like:
@KafkaListener(topics = {"A" , "B"})
Or:
@KafkaListener(topics = "A")
public void receive() {}
@KafkaListener(topics = "B")
public void receive() {}
Both don't guarantee the sequence of reading.
How can I enforce my program to read topic A first, and only go to topic B after finishing the latest updates on topic A?
Upvotes: 0
Views: 1043
Reputation: 174769
Use something like this...
@KafkaListener(id = "bReceiver", autoStartup = "false, topics = "B")
public void receive() {}
Set the idleEventInterval
and add
@Autowired
KafkaListenerEndpointRegistry registry;
@EventListener(condition = "event.listenerId == 'aReceiver`)
public void eventListener(ListenerContainerIdleEvent event) {
this.registry.getListenerContainer("aReceiver").stop(() -> { });
this.registry.getListenerContainer("bReceiver").start();
}
Upvotes: 1