rwachter
rwachter

Reputation: 33

Kafka Streams partition assignment problem with multiple topics and scaling

I'm working on a kafka streams application that consumes from a consumer group with three topics. One topic has 20 partitions, another 10, and the last one 5. So in total, this consumer group has 35 partitions.

The streams application runs in a kubernetes environment and is scaled with multiple instances of the application as pods in a single deployment. The goal would be the ability to scale to 35 pods (and thus, 35 consumers) and have each partition assigned to a single consumer to allow for maximum parallelism.

However, the behavior I see is co-partitioning when partitions are being assigned as the application scales up. So one consumer will have partition 0 from all three topics, another partition 1 from all three topics, etc. This leaves the maximum parallelism I can accomplish at 20. If I had 35 consumers, only 20 would be active.

It is my understanding that I can not break from the co-partitioning behavior with kafka streams as the partition assignment strategy is unchangeable. It is a behavior that I do not want or need. I have a few solutions I've considered, but I'm not sure what approach is best and I'm looking for a little direction on how to proceed.

  1. Accept that the maximum parallelism for this application will be whatever the highest partition count topic is in the consumer group. This would lead to some consumers processing a lot of data and some not processing very much if the lag on the topics is high.

  2. Have each topic be consumed by a separate stream in its own consumer group. This is a problem since consumer groups operate independently and there's no way to ensure 35 consumers are assigned 35 partitions 1:1 across multiple consumer groups by default. There will very likely still be idle consumers.

  3. A similar solution to the above where each topic will have its own consumer group/stream, but consumers will be assigned to consumer groups dynamically as pods go up and down to ensure balance. This is possible to force by using the Kafka Admin API and kubernetes API, but will be complicated/time consuming to implement and maintain.

  4. Have all topics in the consumer group have the same number of partitions. For example, all three topics will have 20 partitions. The ensures with 20 consumers, all 20 consumers are assigned partitions. The downside is I am using confluent cloud so this comes at an increased cost, but is by far the most simple solution.

I'm leaning towards #1 or #4 as a solution, but curious if my understanding is poor or there is an easier/better solution out there.

Thank you!

Upvotes: 1

Views: 42

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

What you observe is by design. In the end, Kafka Streams does not scale via partitions, but via tasks. So you need to break you program into smaller independent "pieces" to get more tasks.

Given that you are reading from three input topics, I assume you are using something like this:

streamsBuilder.stream("t1, "t2", "t3").map(...)...

This program effectively merges the three input topic into a single KStream, and thus you only get a single sub-topology, which will scale to N task, with N = max(t1.partitions, t2.partitions, t3.partitions) as you observed.

Cf https://docs.confluent.io/platform/current/streams/architecture.html#sub-topologies-also-called-sub-graphs

However, you can re-write your program into three independent pieces and thus create a sub-topology per input topic:

streamsBuilder.stream("t1).map(...)...
streamsBuilder.stream("t2).map(...)...
streamsBuilder.stream("t3).map(...)...

(To avoid boiler plate code, you can extract a helper method which takes a KStream as input and applies the actual business logic, and call the help three times for each KStream you create for each topic).

If you have three sub-topologies, each sub-topology gets its own set of tasks, and thus you should be able to scale out to 35 tasks in your setup, as each sub-topology scales to N tasks for N partition of its corresponding input topic now.

To understand the structure of your program, you can inspect TopologyDescription which you get from Topology#describe().

Upvotes: 0

Related Questions