Reputation: 105
I want to stream with Kafka-Streams the latest record of a topic within a time window, and I want to set the timestamp of the output record as being equal to the end of the time window the record was registered on.
My problem is that I cannot access inside the aggregator to the window properties.
Here is the code I have for now :
KS0
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofSeconds(this.periodicity)).grace(Duration.ZERO)
)
.aggregate(
Constants::getInitialAssetTimeValue,
this::aggregator,
Materialized.<AssetKey, AssetTimeValue, WindowStore<Bytes, byte[]>>as(this.getStoreName()) /* state store name */
.withValueSerde(assetTimeValueSerde) /* serde for aggregate value */
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
.to(this.toTopic);
And the aggregation function I am using is this one :
private AssetTimeValue aggregator(AssetKey aggKey, AssetTimeValue newValue, AssetTimeValue aggValue){
// I want to do something like that, but this only works with windowed Keys to which I do
// not have access through the aggregator
// windowEndTime = aggKey.window().endTime().getEpochSecond();
return AssetTimeValue.newBuilder()
.setTimestamp(windowEndTime)
.setName(newValue.getName())
.setValue(newValue.getValue())
.build();
}
Many thanks for you help !
Upvotes: 2
Views: 99
Reputation: 62330
You can manipulate timestamps only via the Processor API. However, you can easily use the Processor API embedded in the DSL.
For your case, you can insert a transform()
between toStream()
and to()
. Within the Transformer
you call context.forward(key, value, To.all().withTimestamp(...))
to set a new timestamp. Additionally, you would return null
at the end (null
means to not emit any record, as you already use context.forward
for this purpose).
Upvotes: 2