Krishna Kalyan
Krishna Kalyan

Reputation: 1702

How to get the latest value within a time window

This is what my streaming data looks like:

time | id | group
---- | ---| ---
1    | a1 | b1
2    | a1 | b2
3    | a1 | b3
4    | a2 | b3

Consider all examples above within our window. My use case gets the latest distinct id.

I need the output to be like below:

time | id | group
---- | ---| ---
3    | a1 | b3
4    | a2 | b3

How can I achieve this in Flink?

I am aware of the window function WindowFunction. However, I cannot wrap my head around doing this.

I have tried this just to get the distinct ids. How can I extend this function to my use case?

class DistinctGrid extends WindowFunction[UserMessage, String, Tuple, TimeWindow] {
  override def apply(key: Tuple, window: TimeWindow, input: Iterable[UserMessage], out: Collector[String]): Unit = {
    val distinctGeo = input.map(_.id).toSet
    for (i <- distinctGeo) {
      out.collect(i)
    }
  }
}

Upvotes: 0

Views: 666

Answers (1)

David Anderson
David Anderson

Reputation: 43707

If you key the stream by the id field, then there's no need to think about distinct ids -- you'll have a separate window for each distinct key. Your window function just needs iterate over the window contents to find the UserMessage with the largest timestamp, and output that as the result of the window (for that key). However, there's a built-in function that does just that -- look at the documentation for maxBy() -- so no need for a window function in this case.

Roughly speaking then, this will look like

stream.keyBy("id")
  .timeWindow(Time.minutes(10))
  .maxBy("time")
  .print()

Upvotes: 1

Related Questions