Reputation: 41
how to update kakfa topic configs in springboot
public void updateTopicById(AdminClient adminClient, String topicName) {
// Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap();
// Collection<ConfigResource> unifiedRequestResources = new ArrayList();
ConfigResource cr = new ConfigResource(ConfigResource.Type.TOPIC,topicName);
ConfigEntry ce = new ConfigEntry(topicName,null);
AlterConfigOp aco=new AlterConfigOp(ConfigEntry, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> mapp1=new HashMap<>();
AlterConfigsOptions var1=new AlterConfigsOptions();
adminClient.incrementalAlterConfigs(mapp1,var1);
LOGGER.info("Topics '{}' deleted from kafka.", topicName);
}```
Upvotes: 1
Views: 1059
Reputation: 504
The above sample code is not directly related to spring Kafka/SpringBoot. Spring has its own Kafka client library.
But your implementation is with the Apache Kafka client. This implementation can be improvised to work as below:
ConfigResource topicRes = new ConfigResource(ConfigResource.Type.TOPIC, "topicName1");
//update the config entry name as per use case
AlterConfigOp alterConfigOp = new AlterConfigOp(
new ConfigEntry("delete.retention.ms", "10000"), AlterConfigOp.OpType.APPEND);
List<AlterConfigOp> alt = new ArrayList<>();
alt.add(alterConfigOp);
final Map<ConfigResource, Collection<AlterConfigOp>> configsMap = new HashMap<>();
configsMap.put(topicRes, alt);
adminClient.incrementalAlterConfigs(configsMap);
Upvotes: 3
Reputation: 41
// Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap();
// Collection<ConfigResource> unifiedRequestResources = new ArrayList();
ConfigResource cr = new ConfigResource(ConfigResource.Type.TOPIC,topic.getName());
Map<String,String> topicConfigMap= topic.getConfigs();
for (Map.Entry<String, String> entry : topicConfigMap.entrySet()) {
System.out.println(entry.getKey() + "/" + entry.getValue());
//String topicConfigKey= entry.getKey().replaceAll("\\.","_").toUpperCase().concat("_CONFIG");
ConfigEntry ce = new ConfigEntry(entry.getKey(), topic.getConfigs().get(entry.getKey()));
AlterConfigOp aco=new AlterConfigOp(ce, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> conf=new HashMap<>();
conf.put(cr,Arrays.asList(aco));
AlterConfigsOptions var1=new AlterConfigsOptions();
adminClient.incrementalAlterConfigs(conf,var1);
// System.out.println("repla:"+topicConfigKey);
}
This is resolving the issue, I am able to update any config. Topic is parameter coming from update service
Upvotes: 1