Enigma2684
Enigma2684

Reputation: 105

Access TimeWindow properties inside an aggregator in Kafka Streams

I want to stream with Kafka-Streams the latest record of a topic within a time window, and I want to set the timestamp of the output record as being equal to the end of the time window the record was registered on.

My problem is that I cannot access inside the aggregator to the window properties.

Here is the code I have for now :

    KS0
        .groupByKey()
        .windowedBy(
            TimeWindows.of(Duration.ofSeconds(this.periodicity)).grace(Duration.ZERO)
        )
        .aggregate(
            Constants::getInitialAssetTimeValue,
            this::aggregator,
            Materialized.<AssetKey, AssetTimeValue, WindowStore<Bytes, byte[]>>as(this.getStoreName()) /* state store name */
                .withValueSerde(assetTimeValueSerde)   /* serde for aggregate value */
        )
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
        .toStream()
        .peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
        .to(this.toTopic);

And the aggregation function I am using is this one :

private AssetTimeValue aggregator(AssetKey aggKey, AssetTimeValue newValue, AssetTimeValue aggValue){

 // I want to do something like that, but this only works with windowed Keys to which I do  
 // not have access through the aggregator
 // windowEndTime = aggKey.window().endTime().getEpochSecond();   

    return AssetTimeValue.newBuilder()
            .setTimestamp(windowEndTime)
            .setName(newValue.getName())
            .setValue(newValue.getValue())
            .build();
}

Many thanks for you help !

Upvotes: 2

Views: 99

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

You can manipulate timestamps only via the Processor API. However, you can easily use the Processor API embedded in the DSL.

For your case, you can insert a transform() between toStream() and to(). Within the Transformer you call context.forward(key, value, To.all().withTimestamp(...)) to set a new timestamp. Additionally, you would return null at the end (null means to not emit any record, as you already use context.forward for this purpose).

Upvotes: 2

Related Questions