Dmytro Serdiuk
Dmytro Serdiuk

Reputation: 959

Is there a way to start/stop Kafka consumer in a Spring app manually?

As for now, I have a Spring Boot CLI application that starts Kafka consumers automatically when the app starts. And I have a task to update the app that provides an API that allows to start or stop Kafka consumers on the specific conditions. So, I'm going to use Spring Boot Starter WEB to create that API. But I can't find a way on how I can manually manage the consuming process. What I need is to

Any suggestions on how I can manually manage the consuming process?

Technical details:

Upvotes: 4

Views: 3694

Answers (1)

Black Diamond
Black Diamond

Reputation: 513

If you have multiple consumers configured in your App, then you must be using a unique key to distinguish them.
The below example is considering multiple consumer config.

Example of a consumer configuration:

consumers:
      "fun-consumer-key":
        topic:  fun.topic-1
        consumerProcessOnStart: org.kafka.ProcessConsumedMessage
        consumerProperties:
          "[bootstrap.servers]": localhost:9092,localhost:9093
          // Other consumer configs

Consumer Factory Listener:

@Component
@Slf4j
public class ConsumerFactoryListener<K, V> implements Listener<K, V> {

  @Override
  public void consumerAdded(final String id, final Consumer<K, V> consumer) {
    //
  }

  @Override
  public void consumerRemoved(final String id, final Consumer<K, V> consumer) {
    //
  }

}

AppPropertiesConfig to hold Consumer object:

@UtilityClass
public class AppPropertiesConfig {

  private static Map<String, Object> configConsumers = new ConcurrentHashMap<>();

  public static Map<String, Object> getConfigConsumers() {
    return consumerMap;
  }
}

Message Listener: Ack & Auto Ack (I am adding Auto Acknowledge and Acknowledge Message Listeners, but you'll have to have separate classes as they implement different Listeners.)

    public class AutoAckMessageListener<K, V> extends
        implements MessageListener<K, V> {
    
      private final ProcessConsumedMessage<K, V> consumeMessage;
    
      
      @Override
      public void onMessage(ConsumerRecord<K, V> consumerRecord) {
        onMessageConsume(consumerRecord, consumeMessage);
      }
    }

    public class AckMessageListener<K, V> extends
        implements AcknowledgingMessageListener<K, V> {
    
      private final ProcessConsumedMessage<K, V> consumeMessage;
    
      
      @Override
      public void onMessage(ConsumerRecord<K, V> consumerRecord, final Acknowledgment acknowledgment) {
        onMessageConsume(consumerRecord, consumeMessage);
        acknowledgment.acknowledge();
      }
    }

  // You can put this method in an abstract class and both listener classes can extend this Abstract class with onMessageConsume method
  public void onMessageConsume(ConsumerRecord<K, V> consumerRecord,
      final ProcessConsumedMessage<K, V> consumeMessage) throws InterruptedException {
    
    // Your custom processing implementation
    consumeMessage.process(consumerRecord.key(), consumerRecord.value());
  }

Initialize Message listener:

  public MessageListener getListener(String className) {
    final ProcessConsumedMessage consumeMessage = (ProcessConsumedMessage) getClassFromName();
    MessageListener listener;
    if (isAutoCommitEnabled) {
      listener = new AutoAckMessageListener(consumeMessage);
    } else {
      listener = new AckMessageListener(consumeMessage);
    }
    return listener;
  }

START a consumer:

public void startConsumer(final String key, final String topic,
          final Object messageListener, final Map<String, Object> consumerProperties) {
        
        
        // Check already created, start and return
        ConcurrentMessageListenerContainer<K, V> container =
            (ConcurrentMessageListenerContainer<K, V>) AppPropertiesConfig
            .getConfigConsumers().get(key);  // key - "fun-consumer-key"
        if (container != null) {
          if (!container.isRunning()) {
           
            container.start();
          }
          return;
        }
        
        final DefaultKafkaConsumerFactory<K, V> factory = new DefaultKafkaConsumerFactory<>(
            consumerProperties);
        factory.addListener(consumerFactoryListener);
    
        final ConcurrentKafkaListenerContainerFactory<K, V> containerFactory
            = new ConcurrentKafkaListenerContainerFactory<>();
        containerFactory.setConsumerFactory(factory);
    
        final ContainerProperties containerProperties = containerFactory.getContainerProperties();
        containerProperties.setPollTimeout(pollTimeout);
    
        // auto-commit??
        if (!isAutoCommitEnabled) {
          containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
        }

        containerFactory.setErrorHandler(getErrorHandler(<some retry configurations object>));
        // create the container
        container = containerFactory.createContainer(topic);
        container.setupMessageListener(messageListener);
        // start
        container.start();
        AppPropertiesConfig.getConfigConsumers().put(key, container);
      }
    
      private SeekToCurrentErrorHandler getErrorHandler() {
          // Provide your error handler. Ex: SeekToCurrentErrorHandler
      }

STOP a consumer:

  public void stopConsumer(final String key) {
    if (StringUtils.isBlank(key)) {
      return;
    }
    final ConcurrentMessageListenerContainer<K, V> container
        = (ConcurrentMessageListenerContainer<K, V>) AppPropertiesConfig
        .getConfigConsumers().get(key);
    if (container == null || !container.isRunning()) {
      throw new Exception();
    }
    try {
      container.stop();
    } catch (Exception e) {
      // log
      throw e;
    } finally {
      AppPropertiesConfig.getConfigConsumers().remove(key);
    }
  }

Upvotes: 2

Related Questions