Reputation: 355
Let's say I use kafka streams (kafka-streams-scala library, version 2.2.0).
I need to keep a few recent values for key in kafka topic using kafka streams. I use it for the enrichment of another stream. So I need something like KTable or GlobalKTable, but they keep only one value.
I figured out one possible way to do this: creating stream and mutable Map, and then using stream.foreach
to keep tracking of N recent values for each key.
val stream: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topicName")
val map = scala.collection.mutable.Map[String, List[MyObject]]
stream.foreach((k, v) => {
//update map
})
my question is if there is a better approach to achieve this - either using streams API or at least without a mutable map.
Upvotes: 4
Views: 1342
Reputation: 15067
So I need something like KTable or GlobalKTable, but they keep only one value.
Keep using a KTable
(or GlobalKTable
), but use a structured value and/or a collection as "the value". Nothing forces in Kafka forces you to restrict a message value to only a primitive data type (like Integer
or String
).
Think: KStream<UserId, List<ClickEvent>>
. Here, each message belongs to a particularly user (identified by the key being UserId
), and each message has list of zero, one, or many ClickEvent
s associated with that user. This "just works", you only need to have proper serdes (serializer/deserializer) for the data types you want to use.
For instance, the CustomStreamTableJoin
example at https://github.com/confluentinc/kafka-streams-examples (direct link to example for v5.2.1, which is for Apache Kafka v2.2) uses a Pair
class to store a tuple in Kafka's message value, and it has its accompanying PairSerde
. The same can be done (and is being done by developers) to store collections of values, like a List<ClickEvent>
, as you mentioned for your own use case.
I need to keep a few recent values for key in kafka topic using kafka streams. [...] I figured out one possible way to do this: creating stream and mutable Map, [...]
You shouldn't need to use a Map
. The key is already available in the Kafka message, so you only need a List-like data type for the message value.
or at least without a mutable map.
You don't need to (and shouldn't) use a mutable data structure unless there is a specific reason, which I don't think there is in your use case. When a new message is being processed and the corresponding output is stored in the KTable
, then whatever was stored in the table for that key will be overwritten -- so using an immutable data structure as the message value is totally fine.
Upvotes: 4