王奕然
王奕然

Reputation: 4049

how to make message uniform distribution at all partitions

enter image description herei use kafka,send message to kafka broker,my partition number is 24,i want to send messages uniform distribution at the 24 partition. now my key is like,

String topicName="data_"+region;
JSONObject jsonObject = JSON.parseObject(json);
Random rand = new Random();
int  n = rand.nextInt(50) + 1;
ListenableFuture<SendResult<Integer, String>> result =kafkaTemplate.send(topicName,type+n,jsonObject.toJSONString());

but these messages not uniform distribution. how to design my key? hash value or something else? thanks your all suggestion!

Upvotes: 0

Views: 1089

Answers (1)

Lachezar Balev
Lachezar Balev

Reputation: 12021

Well, the short answer is that the prefix of your key (type+n) is the culprit. But why? Well, I'm not sure because I've left my maths home today :-)

Nevertheless let's peek under the hood! When you use key for your records (which I highly recommend because you may want to rely on log compaction later) and you write your app in Java or Spring Kafka the partition in which your record will end is determined by the Kafka Java libs. More specifically the decision maker is the default implementation of org.apache.kafka.clients.producer.Partitioner. This implementation is org.apache.kafka.clients.producer.internals.DefaultPartitioner. See here.

Here is how the partition is actually calculated:

return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

The hash function used is murmur2. Let's write a short snippet that will emulate the distribution of 10K records over 24 partitions if you prefix your key (like you do):

Random rand = new Random();

Map<Integer, Integer> distro = new HashMap<>();
    for (int i = 0; i < 10000; i++) {
      int n = rand.nextInt(50) + 1;

      int partition = Utils.toPositive(Utils.murmur2(("type_" + String.valueOf(n)).getBytes())) % 24;
      int cnt = distro.getOrDefault(partition, 0) + 1;

      distro.put(partition, cnt);
    }

    distro.entrySet().forEach(e-> System.out.println("Partition= " + e.getKey() + " Entries= " + e.getValue()));

Here is the bad distribution that you experience:

Partition= 2 Entries= 180
Partition= 4 Entries= 388
Partition= 5 Entries= 813
Partition= 6 Entries= 1438
Partition= 7 Entries= 572
Partition= 9 Entries= 791
Partition= 10 Entries= 1036
Partition= 12 Entries= 815
Partition= 14 Entries= 184
Partition= 15 Entries= 579
Partition= 16 Entries= 608
Partition= 18 Entries= 610
Partition= 19 Entries= 215
Partition= 20 Entries= 562
Partition= 21 Entries= 395
Partition= 22 Entries= 370
Partition= 23 Entries= 444

As you can see some partitions are not even populated and partition 10 and 6 are a bit overloaded. Now let us remove the prefix from your small key like this:

int partition = Utils.toPositive(Utils.murmur2((String.valueOf(n)).getBytes())) % 24;

Things look a bit more uniform now but it is still not perfect:

Partition= 0 Entries= 799
Partition= 1 Entries= 411
Partition= 3 Entries= 835
Partition= 4 Entries= 224
Partition= 5 Entries= 563
Partition= 6 Entries= 591
Partition= 7 Entries= 812
Partition= 8 Entries= 596
Partition= 10 Entries= 211
Partition= 11 Entries= 424
Partition= 12 Entries= 608
Partition= 13 Entries= 225
Partition= 14 Entries= 187
Partition= 15 Entries= 786
Partition= 16 Entries= 584
Partition= 18 Entries= 606
Partition= 19 Entries= 425
Partition= 21 Entries= 159
Partition= 22 Entries= 554
Partition= 23 Entries= 400

You may use UUID-s for your keys as we do, e.g.:

int partition = Utils.toPositive(Utils.murmur2(UUID.randomUUID().toString().getBytes())) % 24;

And this works pretty smooth with murmur2:

Partition= 0 Entries= 429
Partition= 1 Entries= 407
Partition= 2 Entries= 420
Partition= 3 Entries= 435
Partition= 4 Entries= 407
Partition= 5 Entries= 421
Partition= 6 Entries= 403
Partition= 7 Entries= 460
Partition= 8 Entries= 399
Partition= 9 Entries= 415
Partition= 10 Entries= 386
Partition= 11 Entries= 402
Partition= 12 Entries= 424
Partition= 13 Entries= 434
Partition= 14 Entries= 391
Partition= 15 Entries= 426
Partition= 16 Entries= 399
Partition= 17 Entries= 430
Partition= 18 Entries= 435
Partition= 19 Entries= 418
Partition= 20 Entries= 403
Partition= 21 Entries= 418
Partition= 22 Entries= 402
Partition= 23 Entries= 436

Another option is to increase the range of your key which is currently up to 50.

Upvotes: 4

Related Questions