John Black
John Black

Reputation: 355

How to keep N latest values for key in kafka topic using kafka streams

Let's say I use kafka streams (kafka-streams-scala library, version 2.2.0).

I need to keep a few recent values for key in kafka topic using kafka streams. I use it for the enrichment of another stream. So I need something like KTable or GlobalKTable, but they keep only one value.

I figured out one possible way to do this: creating stream and mutable Map, and then using stream.foreach to keep tracking of N recent values for each key.

val stream: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topicName")

val map = scala.collection.mutable.Map[String, List[MyObject]]

stream.foreach((k, v) =>  {
  //update map
})

my question is if there is a better approach to achieve this - either using streams API or at least without a mutable map.

Upvotes: 4

Views: 1342

Answers (1)

miguno
miguno

Reputation: 15067

So I need something like KTable or GlobalKTable, but they keep only one value.

Keep using a KTable (or GlobalKTable), but use a structured value and/or a collection as "the value". Nothing forces in Kafka forces you to restrict a message value to only a primitive data type (like Integer or String).

Think: KStream<UserId, List<ClickEvent>>. Here, each message belongs to a particularly user (identified by the key being UserId), and each message has list of zero, one, or many ClickEvents associated with that user. This "just works", you only need to have proper serdes (serializer/deserializer) for the data types you want to use.

For instance, the CustomStreamTableJoin example at https://github.com/confluentinc/kafka-streams-examples (direct link to example for v5.2.1, which is for Apache Kafka v2.2) uses a Pair class to store a tuple in Kafka's message value, and it has its accompanying PairSerde. The same can be done (and is being done by developers) to store collections of values, like a List<ClickEvent>, as you mentioned for your own use case.

I need to keep a few recent values for key in kafka topic using kafka streams. [...] I figured out one possible way to do this: creating stream and mutable Map, [...]

You shouldn't need to use a Map. The key is already available in the Kafka message, so you only need a List-like data type for the message value.

or at least without a mutable map.

You don't need to (and shouldn't) use a mutable data structure unless there is a specific reason, which I don't think there is in your use case. When a new message is being processed and the corresponding output is stored in the KTable, then whatever was stored in the table for that key will be overwritten -- so using an immutable data structure as the message value is totally fine.

Upvotes: 4

Related Questions