Reputation: 131
I'm using Kafka Admin client API's to create the topic. The topic is getting created, however the topic is getting created with 1 partition by default. The API is not honoring the configurable value provided. Not sure if I'm using it correctly.
Note: Topic creation is enabled at broker level. Also the topic is getting created, but it is getting created with partition 1.
NewTopic newTopic = new NewTopic(TOPIC_NAME, 10, (short) 1);
CreateTopicsResult createTopicsResult = null;
try {
createTopicsResult = KafkaAdminClient.create(getAdminProperties()).createTopics(Collections.singletonList(newTopic));
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
However I'm able to increase the partitions of earlier created topics using Kafka Admin Client API's
Upvotes: 0
Views: 2306
Reputation: 11
You forgot to call .get() on createTopicsResult to wait for a response to your query.
Try this code:
createTopicsResult.values().get(TOPIC_NAME).get()
Upvotes: 1
Reputation: 654
I tried to reproduce this, without success, using the following code:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class AdminApiDemo {
private static final String BOOTSRAP_SERVER = "localhost:9092";
private static final String TOPIC_NAME = "demoTopic";
private static final int NUM_PARTITIONS = 3;
private static final short NUM_REPLICAS = 1;
private final AdminClient adminClient;
private AdminApiDemo(Properties properties) {
this.adminClient = KafkaAdminClient.create(properties);
}
public static void main(String[] args) {
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSRAP_SERVER);
new AdminApiDemo(properties).createTopic(TOPIC_NAME, NUM_PARTITIONS, NUM_REPLICAS);
}
private void createTopic(String topicName, int numPartitions, short numReplicas) {
try {
final NewTopic newTopic = new NewTopic(topicName, numPartitions, numReplicas);
final CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));
result.values().get(topicName).get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
}
}
A kafka-topics --describe
showed the following:
root@kafka:/# kafka-topics --bootstrap-server localhost:9092 --describe --topic demoTopic
Topic:demoTopic PartitionCount:3 ReplicationFactor:1 Configs:
Topic: demoTopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: demoTopic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: demoTopic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
I thought, ok, what if the topic maybe exists before creation, but then again I got an java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'demoTopic' already exists.
, so that could not be your case either.
I know this is no "real" answer, that fixes anything, sorry for that. But I hope it helps, anyway. Maybe someone else can use this to reproduce it in his setting and "sees" the problem.
Upvotes: 1