Reputation: 5904
I have a Kafka Streams application that is receiving data from topic-1 as KStream
and topic-2 as KTable
. Both topics have 4 partitions each. Let's say that I have 4 instances of the application running, then each instance will receive data from a single partition for topic-1. How about topic-2 which is received as KTable
? Are all instances going to receive data from all 4 partitions in that case? If both the topics are keyed the same, then I guess Kafka Streams will ensure that the same partitions are allocated for an application. If topic-2 doesn't have any keys, but rather the application is going to infer that from the value itself, then that means that all the instances need to get all partitions from topic-2. How does Kafka Streams handle this situation?
Thank you!
Upvotes: 6
Views: 2854
Reputation: 62285
KTables
are sharded according to the input partitions. Thus, similar to a KStream
, each instance will get one topic-partition assigned and materialize this topic-partition as shard of the KTable
. Kafka Streams make sure, that topic partitions of different topic are co-located, ie, one instance will get assigned topic-1 partition-0
and topic-2 partition-0
(and so forth).
If topic-2
has no key set, data will be randomly distributed in the topic. For this case, you can use a GlobalKTable
instead. A GlobalKTable
is a full replication of all partitions per instance. If you do a KStream-GlobalKTable-join, you can specify a "mapper" that extracts the join attribute from the table (ie, you can extract the join attribute from the value).
Note: a KStream-GlobalKTable join has different semantics than a KStream-KTable join. It is not time synchronized in contrast to the later, and thus, the join is non-deterministic by design with regard to GlobalKTable updates; i.e., there is no guarantee what KStream record will be the first to "see" a GlobalKTable updates and thus join with the updated GlobalKTable record.
Upvotes: 5