Reputation: 1702
This is what my streaming data looks like:
time | id | group
---- | ---| ---
1 | a1 | b1
2 | a1 | b2
3 | a1 | b3
4 | a2 | b3
Consider all examples above within our window. My use case gets the latest distinct id.
I need the output to be like below:
time | id | group
---- | ---| ---
3 | a1 | b3
4 | a2 | b3
How can I achieve this in Flink?
I am aware of the window function WindowFunction
. However, I cannot wrap my head around doing this.
I have tried this just to get the distinct ids. How can I extend this function to my use case?
class DistinctGrid extends WindowFunction[UserMessage, String, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[UserMessage], out: Collector[String]): Unit = {
val distinctGeo = input.map(_.id).toSet
for (i <- distinctGeo) {
out.collect(i)
}
}
}
Upvotes: 0
Views: 666
Reputation: 43707
If you key the stream by the id field, then there's no need to think about distinct ids -- you'll have a separate window for each distinct key. Your window function just needs iterate over the window contents to find the UserMessage with the largest timestamp, and output that as the result of the window (for that key). However, there's a built-in function that does just that -- look at the documentation for maxBy() -- so no need for a window function in this case.
Roughly speaking then, this will look like
stream.keyBy("id")
.timeWindow(Time.minutes(10))
.maxBy("time")
.print()
Upvotes: 1