Reputation: 8636
From a time-windowed, keyed stream I'd like to get a stream of the largest window seen so far (largest in terms of count of elements).
Currently I have the following code:
source
.keyBy(...)
.timeWindow(...)
.fold((DummyKey, 0)) { case ((_, current), key) => (key, current + 1) }
.keyBy(_ => ())
.maxBy(1)
The result of the fold
is a stream of (key, count)
elements - so from this stream, I want to get a stream of updates of the "key with highest count".
I then key by a constant (keyBy(_ => ())
- as this is a global operation), and use maxBy
- and this almost works: I'm getting a stream of highest counts, but the current highest count is emitted for each element.
I think what I'm looking for is some kind of filter-with-previous value, which would only emit elements when the new value is different from the previous.
Is that possible in Flink currently?
Upvotes: 1
Views: 500
Reputation: 18987
Flink does not feature such a filter by default, but it should be rather easy to implement one yourself.
You can do this with a stateful FlatMap
similar to this one:
val source: DataStream[Int] = ???
source
.keyBy(_: Int => _)
.timeWindow(Time.minutes(10))
.fold((1, 0)) { case ((_, current), key) => (key, current + 1) }
// move everything to the same key
.keyBy(_ => 0)
// use stateful flatmap to remember highest count and filter by that
.flatMapWithState( (in, state: Option[Int]) =>
// filter condition
if (in._2 > state.getOrElse(-1))
// emit new value and update max count
(Seq(in), Some(in._2))
else
// emit nothing (empty Seq()) and keep count
(Seq(), state)
).setParallelism(1)
If the non-parallel (single threaded) filter operator becomes a bottleneck, you can add parallel pre-filter by adding a keyBy
with random keys and a stateful filter FlatMap
with higher parallelism.
Upvotes: 2