Reputation: 959
As for now, I have a Spring Boot CLI application that starts Kafka consumers automatically when the app starts. And I have a task to update the app that provides an API that allows to start or stop Kafka consumers on the specific conditions. So, I'm going to use Spring Boot Starter WEB to create that API. But I can't find a way on how I can manually manage the consuming process. What I need is to
Any suggestions on how I can manually manage the consuming process?
Technical details:
@KafkaListener
is used to create a listenerConcurrentKafkaListenerContainerFactory
is used as KafkaListenerContainerFactory
Upvotes: 4
Views: 3694
Reputation: 513
If you have multiple consumers configured in your App, then you must be using a unique key to distinguish them.
The below example is considering multiple consumer config.
Example of a consumer configuration:
consumers:
"fun-consumer-key":
topic: fun.topic-1
consumerProcessOnStart: org.kafka.ProcessConsumedMessage
consumerProperties:
"[bootstrap.servers]": localhost:9092,localhost:9093
// Other consumer configs
Consumer Factory Listener:
@Component
@Slf4j
public class ConsumerFactoryListener<K, V> implements Listener<K, V> {
@Override
public void consumerAdded(final String id, final Consumer<K, V> consumer) {
//
}
@Override
public void consumerRemoved(final String id, final Consumer<K, V> consumer) {
//
}
}
AppPropertiesConfig to hold Consumer object:
@UtilityClass
public class AppPropertiesConfig {
private static Map<String, Object> configConsumers = new ConcurrentHashMap<>();
public static Map<String, Object> getConfigConsumers() {
return consumerMap;
}
}
Message Listener: Ack & Auto Ack (I am adding Auto Acknowledge and Acknowledge Message Listeners, but you'll have to have separate classes as they implement different Listeners.)
public class AutoAckMessageListener<K, V> extends
implements MessageListener<K, V> {
private final ProcessConsumedMessage<K, V> consumeMessage;
@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord) {
onMessageConsume(consumerRecord, consumeMessage);
}
}
public class AckMessageListener<K, V> extends
implements AcknowledgingMessageListener<K, V> {
private final ProcessConsumedMessage<K, V> consumeMessage;
@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord, final Acknowledgment acknowledgment) {
onMessageConsume(consumerRecord, consumeMessage);
acknowledgment.acknowledge();
}
}
// You can put this method in an abstract class and both listener classes can extend this Abstract class with onMessageConsume method
public void onMessageConsume(ConsumerRecord<K, V> consumerRecord,
final ProcessConsumedMessage<K, V> consumeMessage) throws InterruptedException {
// Your custom processing implementation
consumeMessage.process(consumerRecord.key(), consumerRecord.value());
}
Initialize Message listener:
public MessageListener getListener(String className) {
final ProcessConsumedMessage consumeMessage = (ProcessConsumedMessage) getClassFromName();
MessageListener listener;
if (isAutoCommitEnabled) {
listener = new AutoAckMessageListener(consumeMessage);
} else {
listener = new AckMessageListener(consumeMessage);
}
return listener;
}
START a consumer:
public void startConsumer(final String key, final String topic,
final Object messageListener, final Map<String, Object> consumerProperties) {
// Check already created, start and return
ConcurrentMessageListenerContainer<K, V> container =
(ConcurrentMessageListenerContainer<K, V>) AppPropertiesConfig
.getConfigConsumers().get(key); // key - "fun-consumer-key"
if (container != null) {
if (!container.isRunning()) {
container.start();
}
return;
}
final DefaultKafkaConsumerFactory<K, V> factory = new DefaultKafkaConsumerFactory<>(
consumerProperties);
factory.addListener(consumerFactoryListener);
final ConcurrentKafkaListenerContainerFactory<K, V> containerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(factory);
final ContainerProperties containerProperties = containerFactory.getContainerProperties();
containerProperties.setPollTimeout(pollTimeout);
// auto-commit??
if (!isAutoCommitEnabled) {
containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
}
containerFactory.setErrorHandler(getErrorHandler(<some retry configurations object>));
// create the container
container = containerFactory.createContainer(topic);
container.setupMessageListener(messageListener);
// start
container.start();
AppPropertiesConfig.getConfigConsumers().put(key, container);
}
private SeekToCurrentErrorHandler getErrorHandler() {
// Provide your error handler. Ex: SeekToCurrentErrorHandler
}
STOP a consumer:
public void stopConsumer(final String key) {
if (StringUtils.isBlank(key)) {
return;
}
final ConcurrentMessageListenerContainer<K, V> container
= (ConcurrentMessageListenerContainer<K, V>) AppPropertiesConfig
.getConfigConsumers().get(key);
if (container == null || !container.isRunning()) {
throw new Exception();
}
try {
container.stop();
} catch (Exception e) {
// log
throw e;
} finally {
AppPropertiesConfig.getConfigConsumers().remove(key);
}
}
Upvotes: 2