Reputation: 477
Using kafka-streams, I want to group a stream S
of elements E
by some key K1
while aggregating all values for that same key into a joined result AGG
. This results in a KTable T1
.
Depending on the aggregated result, the value should be repartitioned into another KTable T2
, grouped by a key K2
taken from the aggregated result AGG
. So the aggregated result should generate the key for the next regroup.
In the end I'm only interested in a KTable T2
where the key is K2
and the value is AGG
However, this does not work. I only get a KTable T
for the last value. Not a value for each key K2
I know the results of aggregation are only forwarded after some time, so I already tried to lower commit.interval.ms
to 1 but to no avail.
I also tried to use through
and write the intermediate results to a stream but that didn`t succeed as well.
val finalTable = streamBuilder.kstream("streamS")
.groupBy{ k, v -> createKey1(k, v) }
.aggregate(
{ Agg.empty() },
{ k, v, previousAgg ->
Agg.merge(previousAgg, v)
})
.toStream()
// .through("table1")
.groupBy { k1, agg -> agg.createKey2()}
.reduce{ _, agg -> agg }
For a stream S
containing the following values:
key1="123", id="1", startNewGroup="false"
key1="234", id="2", startNewGroup="false"
key1="123", id="3", startNewGroup="false"
key1="123", id="4", startNewGroup="true"
key1="234", id="5", startNewGroup="false"
key1="123", id="6", startNewGroup="false"
key1="123", id="7", startNewGroup="false"
key1="123", id="8", startNewGroup="true"
I would like the end result to be a KTable having the following latest key-values:
key: 123-1, value: 'key1="123", key2="123-1", ids="1,3"'
key: 234-2, value: 'key1="234", key2="234-2", ids="2,5"'
key: 123-4, value: 'key1="123", key2="123-4", ids="4,6,7"'
key: 123-8, value: 'key1="123", key2="123-8", ids="8"'
The original stream S
of elements is first grouped by key1
where the aggregated result contains the groupby key key1
and adds an extra field key2
containing a combination of key1
with the id
of the first occurrence.
Whenever the aggregation received a value with startNewGroup
set to true
, it returns an aggregation with the key2
field set to the key1
and the id
of the new value, effectively creating a new subgroup.
In the second regroup, we simple group by the key2
field.
However what we really observe is the following:
key: 234-2, value: 'key1="234", key2="234-2", ids="2,5"'
key: 123-8, value: 'key1="123", key2="123-8", ids="8"'
Upvotes: 1
Views: 187
Reputation: 6603
For your use case is better to use Processor API. Processor API can be easily combine with Kafka Streams DSL (Processor API integration).
You have to implement Custom Transformer, that will aggregate your messages for particular key in state store. When startNewGroup=true
message arrive old messages for the key will be forward to downstream and new aggregation will start
You Sample Transformer might look as follow:
import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.state.KeyValueStore
case class CustomTransformer(storeName: String) extends Transformer[String, Value, Agg] {
private var stateStore: KeyValueStore[String, Agg] = null
private var context: ProcessorContext = null
override def init(context: ProcessorContext): Unit = {
this.context = context
stateStore = context.getStateStore(storeName).asInstanceOf[KeyValueStore[String, Agg]]
}
override def transform(key: String, value: Value): Agg = {
val maybeAgg = Option(stateStore.get(key))
if (value.startNewGroup) {
maybeAgg.foreach(context.forward(key, _))
stateStore.put(key, Agg(value))
}
else
stateStore.put(key, maybeAgg.map(_.merge(value)).getOrElse(Agg(value)))
null
}
override def close(): Unit = {}
}
Upvotes: 1