Reputation: 70
I have a kafkaStream topology shown in:
stream
.filter(((key, Trade) -> Trade.tradeTime != null && Trade.tradeTime > todayMillis ))
.groupByKey(
Grouped.with(Serdes.String(),JSONSerdes.Trade())
)
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(Convertor.getCandleByResolution(resolution)), Duration.ofDays(1)))
.aggregate(
OHLC::new,
(( key, value, aggregate) -> aggregate.add(value,key)),
Materialized.<String, OHLC, WindowStore<Bytes, byte[]>>
as(stateStoreName)
.withKeySerde(Serdes.String())
.withValueSerde(JSONSerdes.OHLC())
);
how to I want access to window start time in aggregation function? for example I want pass window start time to add function but key is string. my (key, value) pair is ("IFTTT",OHLC object )
Upvotes: 0
Views: 34
Reputation: 62330
You cannot. It's not exposed. In general there is no need for this information inside the aggregator code.
Feel free to file a Jira ticket with a feature request: https://issues.apache.org/jira/browse/KAFKA
Upvotes: 1