mikemikemike
mikemikemike

Reputation: 61

Kafka KStream Related Message Events in Sliding Window

We have a situation in which I think Kafka Streams could help, but I cannot find any documentation or examples that show how.

There is one similar question I found, but it does not have any implementation advice: Kafka Streams wait function with depending objects

What I want to do:

I would like to correlate related records from a Kafka topic into a single object and publish that new object to a separate output topic. For example, there might be five message records that are related to each other by a unique key - I want to build a new object from those related objects, and produce it to a new topic.

I want all related events within a sliding window of one hour to be aggregated. In other words, as soon as a message A with ID “123” arrives at the consumer, the application must wait at least one hour for the remaining records with ID “123” to arrive. After all records have arrived or one hour has passed, these records are expired.

Finally, all related messages collected over the hour are used to create a new object, which is then sent to another Kafka topic.

Problems I have encountered.

The sliding window in Kafka seems only to work when joining two streams together. We will only have one stream connected to the topic - I do not know why there are two streams required or how we would go about implementing this. I cannot find any examples of this online. All of the stream functions I’ve seen in Kafka simply aggregate / reduce to a simple value when collecting events of the same key. For example, the number of times a key appears or adding up some value

Here is some pseudo-code to describe what I am talking about. The function names/semantics are going to be different if the functionality exists.

    KStream<Key, Object> kstream = kStreamBuilder.stream(TOPIC);
    kstream.windowedBy(
    // One hour sliding Window
    )
    .collectAllRelatedKeys(
    // Collect all Records related to each key
    // map == HashMap<Key, ArrayList<Value>>
       map.get(key).add(value);
    )
    .transformAndProcess(
        if(ALL_EVENTS_COLLECTED) {
        // Create new Object from all related records
            newObject = 
            createNewObjectFromRelatedRecordsFunction(map.get(key));
            producer.send(newObject);   
        }
    )

Questions (And Thank you For Helping):

  1. How could I use sliding windows with a single stream?
  2. How do I customize KStream/KTable functions to collect all related events within the time window and produce the new object to another topic?
  3. How does acknowledging / offset management work with sliding window streams?
  4. Could this guarantee Exactly Once delivery? For reference: https://www.confluent.io/blog/enabling-exactly-kafka-streams/

Upvotes: 6

Views: 2115

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

Sliding window support for aggregation was added in Apache Kafka 2.7.

Cf https://issues.apache.org/jira/browse/KAFKA-5636

Upvotes: 0

Related Questions