Mukund Jalan
Mukund Jalan

Reputation: 1339

Using different key for each message in kafka

We have a kafka producer-consumer setup in Java (using spring-kafka, but probably that is not relevant here). The key used is String and value is custom POJO. The producer-consumer communicate over a single topic test-topic having 16 partitions. The consumer has a concurrency of 16 so that it can read from each partition in parallel.

From the documentation & other references I understand that - Using null keys will distribute the messages sent by the publisher in the partitions in a round-robin fashion. It is advisable to use non-null keys if I am interested in distributing the messages to specific partitions derived using keys

I have the following queries here -

  1. Currently, the producer sends unique key per message. As this is a unique String per message, in almost all the cases it will generate a unique hashcode as well. How will the messages get distributed here, will it be round-robin like null keys or some hash logic that repeats the use of partitions or any other control mechanism?
  2. Is there any pro or con of using keys with strategy as mentioned above over null keys?
  3. I have no requirement of maintaining the order or grouping of messages. In this case is it sensible to use null keys or is it still nice to have a unique or non-unique non-null key per message, if yes, why?
  4. The consumers are reading the messages in batches. Does having or not having a key impact batch reads differently from singular reads?

Upvotes: 2

Views: 5815

Answers (3)

Nitin
Nitin

Reputation: 3842

Kafka producer sends a message to a specific partition based on DefaultPartitioner, custom partitioner, or pass partition information while sending a message to get write to a specific partition. The defined key as null or not null is based on your use cases and requirements but the key purpose is to distribute your messages on different partitions to get consumed by multiple consumers of the consumer group.

The nonnull key makes sure a similar key will park on the same partition which will help you to group multiple similar keys on the same bucket to further analysts at the same time null key make you distribute your messages evenly.

The nonnull key always helps to pass meta details of message for further processing. I would like to prefer to pass the nonnull key with a custom partitioner to control message flow. But it's up to specific requirement and if you want to pass key null that's absolutely fine.

Note: In future release Apache Kafka (2.5) you can able to define RoundRobin partitioner as partition strategy(KIP-369) which not needed to the key to be null. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89070828

  1. If you are not defined custom partitioner it will use the default partitioner

Before Apache Kafka 2.4 it will be going through cycle one after another and send the record to each one. In this case, the old partitioning strategy before Apache Kafka 2.4 would be to cycle through the topic’s partitions and send a record to each one. But as you understand messages go as a batch with configuration parameter linger.ms it may performance impact on small batches as each one goes to specific partitions so Apache Kafka introducer new Sticky partitioner in case of the null key

Apache Kafka introduced Sticky Partitioner(KIP-480) in Apache Kafka 2.4 in case of key null in the default partitioner as mentioned below

Sticky partitioning strategy

The sticky partitioner addresses the problem of spreading out records without keys into smaller batches by picking a single partition to send all non-keyed records. Once the batch at that partition is filled or otherwise completed, the sticky partitioner randomly chooses and “sticks ” to a new partition. That way, over a larger period of time, records are about evenly distributed among all the partitions while getting the added benefit of larger batch sizes.

enter image description here

Please click here for more detail

  1. If you passing a nonnull key and not defined custom partitioner It will be used DefaultPartitioner to identify partition to publish messages. DefaultPartitioner makes use of MurmurHash, a non-cryptographic hash function which is usually used for hash-based lookup. This hash is then used in a modulo operation (% numPartitions) in order to ensure that the returned partition is within the range [0, N] where N is the number of partitions of the topic.

    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

  2. You can also define Custom Partitioner and implement logic to select the partition https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/Partitioner.html

  3. Pass partition explicitly while publishing a message

    /** * Creates a record to be sent to a specified topic and partition */ public ProducerRecord(String topic, Integer partition, K key, V value) { this(topic, partition, null, key, value, null); }

Upvotes: 3

Vasyl Sarzhynskyi
Vasyl Sarzhynskyi

Reputation: 3955

If you specify key, by default partition will be selected based on hash from key modulo number of partitions (hash(key) % partitions_number). By having unique keys you will have a uniform distribution of messages by partitions. You also could override partition behavior by providing your custom partitioner with required logic.

Based on your description, you don't need the key, in that case use null, it will slightly: 1) save resources on Kafka cluster (do not store not needed key), 2) decrease network latency, 3) app will not generate a unique key and not calculate hash from it (all these points are minor and not significant in comparison if you have keys). Keys should be used if you want to order of messages having the same key (but it's not your case, as you have unique keys), or you have business logic based on this key. Batching will work fine with null keys.

Upvotes: 0

aballaci
aballaci

Reputation: 1093

The client controls which partition it publishes messages to. This can be done at random, implementing a kind of random load balancing, or it can be done by some semantic partitioning function. We expose the interface for semantic partitioning by allowing the user to specify a key to partition by and using this to hash to a partition (there is also an option to override the partition function if need be).

The key is used to split the messages to different partitions. With a null key they will end all in the Partition 0 from my experience.

Null Keys are not a good design choice and should not be used in production. Maybe just for fast prototyping.

Sticky partitioning strategy The sticky partitioner addresses the problem of spreading out records without keys into smaller batches by picking a single partition to send all non-keyed records

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                     int numPartitions) {
    if (keyBytes == null) {
        return stickyPartitionCache.partition(topic, cluster);
    }

Upvotes: 0

Related Questions