Reputation: 1339
We have a kafka producer-consumer setup in Java (using spring-kafka, but probably that is not relevant here). The key used is String
and value is custom POJO. The producer-consumer communicate over a single topic test-topic
having 16 partitions. The consumer has a concurrency of 16 so that it can read from each partition in parallel.
From the documentation & other references I understand that - Using null
keys will distribute the messages sent by the publisher in the partitions in a round-robin fashion. It is advisable to use non-null keys if I am interested in distributing the messages to specific partitions derived using keys
I have the following queries here -
String
per message, in almost all the cases it will generate a unique hashcode as well. How will the messages get distributed here, will it be round-robin like null
keys or some hash logic that repeats the use of partitions or any other control mechanism?null
keys?null
keys or is it still nice to have a unique or non-unique non-null key per message, if yes, why?Upvotes: 2
Views: 5815
Reputation: 3842
Kafka producer sends a message to a specific partition based on DefaultPartitioner, custom partitioner, or pass partition information while sending a message to get write to a specific partition. The defined key as null or not null is based on your use cases and requirements but the key purpose is to distribute your messages on different partitions to get consumed by multiple consumers of the consumer group.
The nonnull key makes sure a similar key will park on the same partition which will help you to group multiple similar keys on the same bucket to further analysts at the same time null key make you distribute your messages evenly.
The nonnull key always helps to pass meta details of message for further processing. I would like to prefer to pass the nonnull key with a custom partitioner to control message flow. But it's up to specific requirement and if you want to pass key null that's absolutely fine.
Note: In future release Apache Kafka (2.5) you can able to define RoundRobin partitioner as partition strategy(KIP-369) which not needed to the key to be null. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89070828
Before Apache Kafka 2.4 it will be going through cycle one after another and send the record to each one. In this case, the old partitioning strategy before Apache Kafka 2.4 would be to cycle through the topic’s partitions and send a record to each one. But as you understand messages go as a batch with configuration parameter linger.ms it may performance impact on small batches as each one goes to specific partitions so Apache Kafka introducer new Sticky partitioner in case of the null key
Apache Kafka introduced Sticky Partitioner(KIP-480) in Apache Kafka 2.4 in case of key null in the default partitioner as mentioned below
Sticky partitioning strategy
The sticky partitioner addresses the problem of spreading out records without keys into smaller batches by picking a single partition to send all non-keyed records. Once the batch at that partition is filled or otherwise completed, the sticky partitioner randomly chooses and “sticks ” to a new partition. That way, over a larger period of time, records are about evenly distributed among all the partitions while getting the added benefit of larger batch sizes.
Please click here for more detail
If you passing a nonnull key and not defined custom partitioner It will be used DefaultPartitioner to identify partition to publish messages. DefaultPartitioner makes use of MurmurHash, a non-cryptographic hash function which is usually used for hash-based lookup. This hash is then used in a modulo operation (% numPartitions) in order to ensure that the returned partition is within the range [0, N] where N is the number of partitions of the topic.
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
You can also define Custom Partitioner and implement logic to select the partition https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/Partitioner.html
Pass partition explicitly while publishing a message
/** * Creates a record to be sent to a specified topic and partition */ public ProducerRecord(String topic, Integer partition, K key, V value) { this(topic, partition, null, key, value, null); }
Upvotes: 3
Reputation: 3955
If you specify key, by default partition will be selected based on hash from key modulo number of partitions (hash(key) % partitions_number
). By having unique keys you will have a uniform distribution of messages by partitions. You also could override partition behavior by providing your custom partitioner with required logic.
Based on your description, you don't need the key, in that case use null
, it will slightly: 1) save resources on Kafka cluster (do not store not needed key), 2) decrease network latency, 3) app will not generate a unique key and not calculate hash from it (all these points are minor and not significant in comparison if you have keys). Keys should be used if you want to order of messages having the same key (but it's not your case, as you have unique keys), or you have business logic based on this key. Batching will work fine with null
keys.
Upvotes: 0
Reputation: 1093
The client controls which partition it publishes messages to. This can be done at random, implementing a kind of random load balancing, or it can be done by some semantic partitioning function. We expose the interface for semantic partitioning by allowing the user to specify a key to partition by and using this to hash to a partition (there is also an option to override the partition function if need be).
The key is used to split the messages to different partitions. With a null key they will end all in the Partition 0 from my experience.
Null Keys are not a good design choice and should not be used in production. Maybe just for fast prototyping.
Sticky partitioning strategy The sticky partitioner addresses the problem of spreading out records without keys into smaller batches by picking a single partition to send all non-keyed records
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
Upvotes: 0