Darren
Darren

Reputation: 792

kafka streams - joining partitioned topics

My understanding is that kafka streams supports partitioning. I am wondering how that works when joining data from two different topics? I assume that in order to join data based on two different topics, the client app must some how guarantee that the messages it gets from both topics share the same key. Just wondering how kafka streams does this?

Upvotes: 4

Views: 3058

Answers (2)

Darren
Darren

Reputation: 792

A piece of the puzzle is making sure that Kafka streams gets allocated the same partition number for both topics. To guarantee this, it connects to both topics using the same consumer instance, and then relies on a range assignor strategy to get the same partition number. See https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html

Upvotes: 0

YamYamm
YamYamm

Reputation: 401

There are couple of pre-requisets to be able to do stream-stream , ktable-ktable or stream-ktable joins;

  • Topics need to be co-partitioned. Meaning they have to have same number of partitions. This requisite is actually a hard one and streams API won't allow joining if topics are not co-partitioned and will throw TopologyBuilderException at runtime when partitions are about to be assigned.

Other than this requirement any join will work but in order it to be work correctly a number of additional requirements must be met, such as;

  • Both topics should use the same key schema. For example if one topic uses userName as key and other userSurname joining operation will work but most probably won't produce any meaningful output.
  • Producer applications that are writing to joined topics should use the same partitioning strategy. That way same keys will end up at the same partitions that are assigned to be joined.
  • Both topics should use same message timestamp strategy(logAppendTime or CreateTime). This one is not a requirement per say but should be considered for windowed joins if topics use different messageTimeStampTypes since messageTimeStamps are used for determining relevant messages to join together and missing this can lead to hard to find bugs.

GlobalKTable joins don't have any of this requirements and will work with every topic regardless of partition count, partitioning strategy vs because all data for globalKTable will be presented to every single streams instance.

When messages are produced they will be sent to partitions based on their key and partitioning strategy, streams API assigns same topics partitions from each topic to the same processor so that all relevant messages from same topic having same key will be processed in same processor. For windowed joins message timestamps are considered to find messages to join for this particular window and emit the result once the join is done.

Upvotes: 8

Related Questions