Reputation: 61
We have a situation in which I think Kafka Streams could help, but I cannot find any documentation or examples that show how.
There is one similar question I found, but it does not have any implementation advice: Kafka Streams wait function with depending objects
What I want to do:
I would like to correlate related records from a Kafka topic into a single object and publish that new object to a separate output topic. For example, there might be five message records that are related to each other by a unique key - I want to build a new object from those related objects, and produce it to a new topic.
I want all related events within a sliding window of one hour to be aggregated. In other words, as soon as a message A with ID “123” arrives at the consumer, the application must wait at least one hour for the remaining records with ID “123” to arrive. After all records have arrived or one hour has passed, these records are expired.
Finally, all related messages collected over the hour are used to create a new object, which is then sent to another Kafka topic.
Problems I have encountered.
The sliding window in Kafka seems only to work when joining two streams together. We will only have one stream connected to the topic - I do not know why there are two streams required or how we would go about implementing this. I cannot find any examples of this online. All of the stream functions I’ve seen in Kafka simply aggregate / reduce to a simple value when collecting events of the same key. For example, the number of times a key appears or adding up some value
Here is some pseudo-code to describe what I am talking about. The function names/semantics are going to be different if the functionality exists.
KStream<Key, Object> kstream = kStreamBuilder.stream(TOPIC);
kstream.windowedBy(
// One hour sliding Window
)
.collectAllRelatedKeys(
// Collect all Records related to each key
// map == HashMap<Key, ArrayList<Value>>
map.get(key).add(value);
)
.transformAndProcess(
if(ALL_EVENTS_COLLECTED) {
// Create new Object from all related records
newObject =
createNewObjectFromRelatedRecordsFunction(map.get(key));
producer.send(newObject);
}
)
Questions (And Thank you For Helping):
Upvotes: 6
Views: 2115
Reputation: 62285
Sliding window support for aggregation was added in Apache Kafka 2.7.
Cf https://issues.apache.org/jira/browse/KAFKA-5636
Upvotes: 0