Jan Bols
Jan Bols

Reputation: 477

regrouping with KStream semantics

Using kafka-streams, I want to group a stream S of elements E by some key K1 while aggregating all values for that same key into a joined result AGG. This results in a KTable T1.

Depending on the aggregated result, the value should be repartitioned into another KTable T2, grouped by a key K2 taken from the aggregated result AGG. So the aggregated result should generate the key for the next regroup.

In the end I'm only interested in a KTable T2 where the key is K2 and the value is AGG

However, this does not work. I only get a KTable T for the last value. Not a value for each key K2

I know the results of aggregation are only forwarded after some time, so I already tried to lower commit.interval.ms to 1 but to no avail.

I also tried to use through and write the intermediate results to a stream but that didn`t succeed as well.

val finalTable = streamBuilder.kstream("streamS")
                    .groupBy{ k, v -> createKey1(k, v) }
                    .aggregate(
                            { Agg.empty() },
                            { k, v, previousAgg ->
                                Agg.merge(previousAgg, v)
                            })
                    .toStream()
//                    .through("table1")
                    .groupBy { k1, agg -> agg.createKey2()}
                    .reduce{ _, agg -> agg }

For a stream S containing the following values:
key1="123", id="1", startNewGroup="false"
key1="234", id="2", startNewGroup="false"
key1="123", id="3", startNewGroup="false"
key1="123", id="4", startNewGroup="true"
key1="234", id="5", startNewGroup="false"
key1="123", id="6", startNewGroup="false"
key1="123", id="7", startNewGroup="false"
key1="123", id="8", startNewGroup="true"

I would like the end result to be a KTable having the following latest key-values:
key: 123-1, value: 'key1="123", key2="123-1", ids="1,3"'
key: 234-2, value: 'key1="234", key2="234-2", ids="2,5"'
key: 123-4, value: 'key1="123", key2="123-4", ids="4,6,7"'
key: 123-8, value: 'key1="123", key2="123-8", ids="8"'

The original stream S of elements is first grouped by key1 where the aggregated result contains the groupby key key1 and adds an extra field key2 containing a combination of key1 with the id of the first occurrence.
Whenever the aggregation received a value with startNewGroup set to true, it returns an aggregation with the key2 field set to the key1 and the id of the new value, effectively creating a new subgroup.
In the second regroup, we simple group by the key2 field.

However what we really observe is the following:
key: 234-2, value: 'key1="234", key2="234-2", ids="2,5"'
key: 123-8, value: 'key1="123", key2="123-8", ids="8"'

Upvotes: 1

Views: 187

Answers (1)

Bartosz Wardziński
Bartosz Wardziński

Reputation: 6603

For your use case is better to use Processor API. Processor API can be easily combine with Kafka Streams DSL (Processor API integration).

You have to implement Custom Transformer, that will aggregate your messages for particular key in state store. When startNewGroup=true message arrive old messages for the key will be forward to downstream and new aggregation will start

You Sample Transformer might look as follow:

import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.state.KeyValueStore

case class CustomTransformer(storeName: String) extends Transformer[String, Value, Agg] {

  private var stateStore: KeyValueStore[String, Agg] = null
  private var context: ProcessorContext = null

  override def init(context: ProcessorContext): Unit = {
    this.context = context
    stateStore = context.getStateStore(storeName).asInstanceOf[KeyValueStore[String, Agg]]
  }

  override def transform(key: String, value: Value): Agg = {
    val maybeAgg = Option(stateStore.get(key))

    if (value.startNewGroup) {
      maybeAgg.foreach(context.forward(key, _))
      stateStore.put(key, Agg(value))
    }
    else
      stateStore.put(key, maybeAgg.map(_.merge(value)).getOrElse(Agg(value)))
    null
  }

  override def close(): Unit = {}
}

Upvotes: 1

Related Questions