sclee1
sclee1

Reputation: 1281

Multiple kafka topics with the number of parallelism in Flink

I am testing the Flink using Kafka source.

My test scenario is as follows.

The data is already stored in the kafka topics and Flink consumes the data using FlinkKafkaConsumer with earliest mode.

The number of the kafka partitions are fixed as 15. So when I tested it by changing the number of parallelisms in Flink, the 15 is ideal case which is equal to the number of kafka partitions.

The interesting case is that I used 2 kafka topics with 15 partitions each. Note that the total data are distributed to those two topics so the entire data size is equal to the case where I used the single topic as mentioned above paragraph.

In this case, there are 30 partitions for the kafka cluster so I think that the ideal number of parallelisms is 30. However, when I tested it with my cluster using 5 brokers, some idle instances (six out of 30) were found in the Flink task manager UI.

So my questions are as follows.

  1. Why some instances are idle (6 out of 30) even though there are total 30 partitions in 2 topics? (I used the 30 parallelism and those six instances did not read any bytes and I had sufficient task slots and resources)

  2. When we have one topic and suppose that partitions >= parallelisms, what is the mechanism of the instances (=parallel instance) for consuming the data in partition? For example, there are 10 partitions and I set 5 parallelisms, then those 5 instances firstly consumes 5 partitions and then remaining data in other 5 partitions will be consumed. Or, each instances consumed the data in partitions in the round-robin manner?

  3. Suppose that 2 topics with 15 partitions and there are only 15 parallelisms, then after data in the first topic is consumed by 15 instances, then the second topic data is consumed later? Or regardless of the topic, the instances consumed the partition?

Look forward to your answer.

Upvotes: 1

Views: 1087

Answers (1)

kkrugler
kkrugler

Reputation: 9245

Flink does distribute Kafka partitions to slots in a round-robin manner. The interesting bit for you is that the starting slot (for the first partition) is based on the hash of the topic name. Subsequent partitions are sent to increasing slot numbers, which wrap around when the last slot is used.

So if you have more partitions than slots, you can get an uneven number of partitions per slot if (partitions % slots) != 0.

If you have multiple topics, all partitions are processed at the same time. So because the starting slot for each topic's first partition is based on the topic name's hash, the distribution of partitions to slots can be even more unbalanced. Worst case, if you had N topics each with 1 partition, and somehow each topic name's hash % parallelism was the same value, then one unlucky slot would get all N partitions.

Upvotes: 1

Related Questions