donare
donare

Reputation: 1

In Kafka Streams, how do you parallelize complex operations (or sub-topologies) using multiple topics and partitions?

I am currently trying to understand how Kafka Streams achieves parallelism. My main concern boils down to three questions:

  1. Can multiple sub-topologies read from the same partition?
  2. How can you parallelise a complex operation (making up a sub-topology) that uses the processor API and requires reading the entire topic?
  3. Can multiple sub-topologies read from the same topic (such that independent and expensive operations on the same topic can be run in different sub-topologies)?

As the developer, we don't have direct control about how topologies are divided into sub-topologies. Kafka Streams divides the Topology into multiple sub-topologies using Topics as a "bridge" where possible. Additionally, multiple stream tasks are created that each read a subset of data from the input topic, divided by partition. The documentation reads:

Slightly simplified, the maximum parallelism at which your application may run is bounded by the maximum number of stream tasks, which itself is determined by maximum number of partitions of the input topic(s) the application is reading from.


Assume there was a sub-topology that reads multiple input topics whose number of partitions are not identical. If the above excerpt of the documentation is to be believed, then one or more partitions of the topic that has less partitions would need to be assigned to multiple stream tasks (if both topics need to be read for the logic to work). However, this should not be possible, because, as I understand it, multiple instances of the streams application (each sharing the same application id) act as one Consumer group, where each partition is only assigned once. In such a case, the number of tasks being created for a sub-topology should actually be limited by the minimum number of partitions of its input topics, i.e. a single partition is only assigned to one Task.

I am not sure if the initial problem, i.e. a non-co-partitioned sub-topology would actually occur. If there is an operation that requires to read both input topics, the data would probably need to be co-partitioned (like in Joins).


Say there was an expensive operation between two topics (possibly built from multiple custom processors) that requires the data of one topic to always be available in its entirety. You would want to parallelise this operation into multiple tasks.

If the topic had just one partition, and a partition could be read multiple times, this would not be a problem. However, as discussed earlier, I don't believe this to work.

Then there are GlobalKTables. However, there is no way to use GlobalKTables with custom processors (toStream is not available).

Another idea would be to broadcast the data into multiple partitions, essentially duplicating it by the partition count. This way, multiple stream tasks could be created for the topology to read the same data. To do this, a custom partitioner could be specified in the Produced-Instance given to KStream#to. If this data duplication can be accepted, this seems to be the only way to achieve what I have in mind.


Regarding question number three, because the Streams application is one Consumer group, I would also expect this to not be possible. With my current understanding, this would require to write the data into multiple identical topics (again essentially duplicating the data), such that independent sub-topologies can be created. An alternative would be to run separate streaming applications (such that a different consumer group is used).

Upvotes: 0

Views: 1239

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191963

Without seeing your topology definition, this is a somewhat vague question. You can have repartition and changelog topics. These are duplicated data from the original input topic.

But stateless operators like map, filter, etc. pass data through from the same (assigned) partitions for each thread.

A "sub topology" is still part of only one application.id, thus one consumer group, so no, it cannot read the same topic partitions more than once. For that, you'd need independent streams/tables via branching operations within the whole topology, for example, filtering numbers by even and odd only consumes the topic once; you don't need to "broadcast" records to all partitions, and I'm not sure that's even possible out of the box (to sends one-to-one, and Produced defines serialization, not multiple partitions). If you need to cross reference different operators, then you can use join / statestores / KTables.


None of this is really related to parallelism. You have num.stream.threads, or you can run multiple instances of the same JVM process to scale.

Upvotes: 0

Related Questions