Reputation: 951
Im trying to set the no of partitions to 2 from the code,and i have single node setup, (1 zookeeper, 1kafka). when i consume the message i see that kafka is using only one partition to store the data, Do i need to make any modifications to the setup to have multiple partitions ?
private void setupZookeeper(String[] topicList){
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String[] zookeeperHosts = {"localhost:2181"}; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs
//String topicName = "testTopic";
int noOfPartitions = 2;
int noOfReplication = 1;
for(String zookeeper:zookeeperHosts){
zkClient = new ZkClient(zookeeper, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeper), false);
for(String topicName: topicList){
System.out.println("Setting no of partitions ="+noOfPartitions + "for topic" + topicName);
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication,
producerConfig(),RackAwareMode.Disabled$.MODULE$);
}
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
My producerConfig, looks like the following:
private Properties producerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
//props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
Upvotes: 0
Views: 773
Reputation: 3228
when i consume the message i see that kafka is using only one partition to store the data
The default message partitioning strategy as below, "only one partition used" may be caused by constant message key, same hash value calculated and route to only one partition.
you
Upvotes: 2