mohammadjavadkh
mohammadjavadkh

Reputation: 70

access to window start time in aggregation function

I have a kafkaStream topology shown in:

stream
                        .filter(((key, Trade) -> Trade.tradeTime != null && Trade.tradeTime > todayMillis ))
                        .groupByKey(
                                Grouped.with(Serdes.String(),JSONSerdes.Trade())
                        )
                        .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(Convertor.getCandleByResolution(resolution)), Duration.ofDays(1)))
                        .aggregate(
                                OHLC::new,
                                (( key, value, aggregate) -> aggregate.add(value,key)),
                                Materialized.<String, OHLC, WindowStore<Bytes, byte[]>>
                                                as(stateStoreName)
                                        .withKeySerde(Serdes.String())
                                        .withValueSerde(JSONSerdes.OHLC())
                        );

how to I want access to window start time in aggregation function? for example I want pass window start time to add function but key is string. my (key, value) pair is ("IFTTT",OHLC object )

Upvotes: 0

Views: 34

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

You cannot. It's not exposed. In general there is no need for this information inside the aggregator code.

Feel free to file a Jira ticket with a feature request: https://issues.apache.org/jira/browse/KAFKA

Upvotes: 1

Related Questions