Reputation: 113
I am trying to create a table from streaming through topic by changing the key but the value remains the same. Is it possible to avoid repartition?
streamsBuilder.stream(
TOPIC,
Consumed.with(IdSerde(), ValueSerde())
)
.peek { key, value -> logger.info("Consumed $TOPIC, key: $key, value: $value") }
.filter { _, value -> value != null }
.selectKey(
{ _, value -> NewKey(value.newKey.toString()) },
Named.`as`("changeKey")
)
.toTable(
Materialized.`as`<NewKey, Value, KeyValueStore<Bytes, ByteArray>>(
NEW_TABLE_NAME
)
.withKeySerde(NewKeySerde())
.withValueSerde(ValueSerde())
)
return streamsBuilder.build()
Upvotes: 0
Views: 1059
Reputation: 108
The only way I can think off is that your producer shoud send the data to the input topic with the appropriate key rather than you selecting the key in the streams application.
Otherwise, the answer is no.
Before going into the explanation, a bit of background on KTable
A KTable is an abstraction of a changelog stream. This means a KTable holds only the latest value for a given key.
Consider the following four records(in order) being sent to the stream
("alice", 1) --> ("bob", 100) --> ("alice", 3) --> ("bob", 50)
For the above the KTable would look as follows (alice and bob being keys of the message):
("alice", 3)
("bob", 50)
Explanation
Whenever we run multiple instances of the kafka streams application, the KTable in each instance holds only the local state of the application. Hence, repartitioning is required to ensure that the messages with the same key land in the same partition.
To understand this better let's consider an example. We have a topic with two partitions. Partition 0 has a single event and Partition 1 has three events as follows(null represents no key):
Topic Partition 0: (null, {"name": alice,"count": 1}) ,
Topic Partition 1: (null, {"name": alice,"count": 3}) , (null, {"name": "bob","count": 100}), (null, {"name": "bob","count": 50})
We create a kafka streams application to read data from this topic and create a KTable using the name field as key. Also, the streams application is running with two instances. Each instance will be assigned a single partition as shown below:
Topic Partition 0: -----> Instance 1
Topic Partition 1: -----> Instance 2
Since KTables are maintained locally per instance, if no repartitioning is done - KTable will be in an inconsistent state for both the instances. This is shown below:
Instance 1 KTable : ("alice", {"count":1})
Instance 2 KTable : ("alice", {"count":1}), ("bob", {"count":50})
Hence, to avoid issues like above kafka streams repartitions the topic if a KTable is created after a selectKey operation.
Upvotes: 1