Frederic Tardif
Frederic Tardif

Reputation: 126

Kafka: Delete idle consumer group id

In some cases, I use Kafka-stream to model a small in memory (hashmap) projection of a topic. The K,V cache does require some manipulations, so it is not a good case for a GlobalKTable. In such a “caching” scenario, I want all my sibling instances to have the same cache, so I need to bypass the consumer-group mechanism.

To enable this, I normally simply start my apps with a randomly generated application Id, so each app will reload the topic each time it restarts. The only caveat to that is that I end up with some consumer group orphaned on the kafka brokers up to the offsets.retention.minutes which is not ideal for our operational monitoring tools. Any idea how to workaround this?

Thanks

Upvotes: 2

Views: 2232

Answers (1)

Michael Heil
Michael Heil

Reputation: 18475

There is a Java API in the AdminClient called deleteConsumerGroups which can be used to delete individual ConsumerGroups.

You can use it as shown below with Kafka 2.5.0.

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;

public class DeleteConsumerGroups {
  public static void main(String[] args) {
    System.out.println("*** Starting AdminClient to delete a Consumer Group ***");

    final Properties properties = new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
    properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000");

    AdminClient adminClient = AdminClient.create(properties);
    String consumerGroupToBeDeleted = "console-consumer-65092";
    DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(Arrays.asList(consumerGroupToBeDeleted));

    KafkaFuture<Void> resultFuture = deleteConsumerGroupsResult.all();
    try {
      resultFuture.get();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }

    adminClient.close();
  }
}

ConsumerGroups List before running the code above

$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
console-consumer-65092
console-consumer-53268

ConsumerGroups List after running the code above

$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
console-consumer-53268

Upvotes: 4

Related Questions