Reputation: 21
I have a streams application that calculates the high/low/volume of incoming trades. I have an input topic that has trade messages. There is no timestamp associated with these messages. I create a KGroupedStream from the input stream and group it by stock symbol as the key
Then I create a KTable from the KGroupedStream. I aggregate the data and I compute the high/low/volume and create a new message called HiLowMessage and stream that out to my output stream.
Since the input topic always has data. How can I ensure that the aggregation is only being done for TODAY's data and not including yesterdays data? Note that there is no timestamp in the input topic message structure.
Upvotes: 1
Views: 263
Reputation: 62330
Each Kafka message does have a timestamp in it's metadata field (ie, in addition to key and value). This timestamp is usually set by the upstream producer that write the data into the topic. By default, this record metadata timestamp is use in Kafka Streams. Hence, you could do a windowed aggregation with a 1-day hopping TimeWindow
.
Upvotes: 2