Reputation: 1819
I have an old project (it's not mine) and I'm trying to update it from Kafka 2.1 to 2.4.
I have the following piece of code
public synchronized void increasePartitions(String topic, int partitions) throws InvalidPartitionsException, IllegalArgumentException {
StringBuilder commandString = new StringBuilder();
commandString.append("--alter");
commandString.append(" --topic ").append(topic);
commandString.append(" --zookeeper ").append(config.getOrDefault("zookeeper.connect",
"localhost:2181"));
commandString.append(" --partitions ").append(partitions);
String[] command = commandString.toString().split(" ");
TopicCommand.alterTopic(kafkaZkClient, new TopicCommand.TopicCommandOptions(command));
}
It says that the alterTopic
method of TopicCommand
doesn't exist. I'm looking at the documentation and I don't know how to solve it.
I need this method to do the exact same thing but with Kafka version 2.4.
Upvotes: 0
Views: 36
Reputation: 26885
You should use the Admin API to perform tasks like this.
In order to add partitions, there's the createPartitions()
method.
For example, to increase the number of partitions for my-topic
to 10:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
Admin admin = Admin.create(props);
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put("my-topic", NewPartitions.increaseTo(10));
CreatePartitionsResult createPartitions = admin.createPartitions(newPartitions);
createPartitions.all().get();
Upvotes: 1