Reputation: 55
We have specific topic and it is required to consume messages only if condition consumeEnabled=true. So, it should work like this:
Case when app is consuming messages, but then consumeEnabled become false not necessary to consider.
Please, hep define the best way to implement decision with Spring Kafka and\or Kafka Java client
Upvotes: 2
Views: 3363
Reputation: 174779
If you are using @KafkaListener
then
@KafkaListener(id = "foo", ... , autoStartup="${consume.enabled}")
where consume.enabled
is a property.
To start/stop a container at runtime, use the KafkaListenerEndpointRegistry
bean.
registry.getListenerContainer("foo").start();
Upvotes: 7
Reputation: 192013
You could put your consumer in a simple thread that toggles the polling state of the consumer object.
public class EnabledConsumer implements Runnable {
private Consumer consumer;
private boolean enabled;
public EnabledConsumer(Consumer consumer, boolean enabled) {
this.consumer = consumer;
this.enabled = enabled;
}
public void setEnabled(boolean enable) {
this.enabled = enable;
}
@Override
public void run() {
while(enabled) {
ConsumerRecords records = consumer.poll(...);
...
}
}
Upvotes: 1