Chitra Singh
Chitra Singh

Reputation: 41

Kafka topic configuration update in springboot

how to update kakfa topic configs in springboot

    public void updateTopicById(AdminClient adminClient,  String topicName) {
      //  Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap();
       // Collection<ConfigResource> unifiedRequestResources = new ArrayList();
        ConfigResource cr = new ConfigResource(ConfigResource.Type.TOPIC,topicName);
        ConfigEntry ce = new ConfigEntry(topicName,null);
        AlterConfigOp aco=new AlterConfigOp(ConfigEntry, AlterConfigOp.OpType.SET);
       Map<ConfigResource, Collection<AlterConfigOp>> mapp1=new HashMap<>();
        AlterConfigsOptions var1=new AlterConfigsOptions();
        adminClient.incrementalAlterConfigs(mapp1,var1);
        LOGGER.info("Topics '{}' deleted from kafka.", topicName);
    }```

Upvotes: 1

Views: 1059

Answers (2)

kus
kus

Reputation: 504

The above sample code is not directly related to spring Kafka/SpringBoot. Spring has its own Kafka client library.

But your implementation is with the Apache Kafka client. This implementation can be improvised to work as below:

ConfigResource topicRes = new ConfigResource(ConfigResource.Type.TOPIC, "topicName1");

//update the config entry name as per use case
AlterConfigOp alterConfigOp = new AlterConfigOp(
        new ConfigEntry("delete.retention.ms", "10000"), AlterConfigOp.OpType.APPEND);

List<AlterConfigOp> alt = new ArrayList<>();
alt.add(alterConfigOp);
final Map<ConfigResource, Collection<AlterConfigOp>> configsMap = new HashMap<>();
configsMap.put(topicRes, alt);

adminClient.incrementalAlterConfigs(configsMap); 

Upvotes: 3

Chitra Singh
Chitra Singh

Reputation: 41

      //  Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap();
       // Collection<ConfigResource> unifiedRequestResources = new ArrayList();
        ConfigResource cr = new ConfigResource(ConfigResource.Type.TOPIC,topic.getName());
        Map<String,String> topicConfigMap= topic.getConfigs();
        for (Map.Entry<String, String> entry : topicConfigMap.entrySet()) {
            System.out.println(entry.getKey() + "/" + entry.getValue());
            //String topicConfigKey= entry.getKey().replaceAll("\\.","_").toUpperCase().concat("_CONFIG");
            ConfigEntry ce = new ConfigEntry(entry.getKey(), topic.getConfigs().get(entry.getKey()));
            AlterConfigOp aco=new AlterConfigOp(ce, AlterConfigOp.OpType.SET);
            Map<ConfigResource, Collection<AlterConfigOp>> conf=new HashMap<>();
            conf.put(cr,Arrays.asList(aco));
            AlterConfigsOptions var1=new AlterConfigsOptions();
            adminClient.incrementalAlterConfigs(conf,var1);
           // System.out.println("repla:"+topicConfigKey);
        }

This is resolving the issue, I am able to update any config. Topic is parameter coming from update service

Upvotes: 1

Related Questions