Reputation: 111
This is how I produce message :
String json = gson.toJson(msg);
ProducerRecord<String, String> record = new ProducerRecord<>(kafkaProducerConfig.getTopic(), json);
long startTime = System.currentTimeMillis();
try {
RecordMetadata meta = producer.send(record).get(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
I have 15
partitions for this topic, I did not mention the partition key while producing, what will be the default partition assigned ?
Upvotes: 10
Views: 6334
Reputation: 191681
Since you're sending no key as part of the record, it is null.
Kafka has a DefaultPartitioner that will round-robin any null keys over each partition. As of Kafka 2.4, a StickyPartitioner was introduced, so partition is only computed per batch rather than per record.
For non-null keys, a Murmur2 hash is computed, then modulo'd by the number of partitions for the topic
Upvotes: 14
Reputation: 3832
If you are not defined any custom partition it will use the default partitioner as per the below rule
Below default, Partition implementation to get a better understanding
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
Upvotes: 5