adamw
adamw

Reputation: 8636

Stream of updates to the largest time window in Flink

From a time-windowed, keyed stream I'd like to get a stream of the largest window seen so far (largest in terms of count of elements).

Currently I have the following code:

source
  .keyBy(...)
  .timeWindow(...)
  .fold((DummyKey, 0)) { case ((_, current), key) => (key, current + 1) }
  .keyBy(_ => ())
  .maxBy(1)

The result of the fold is a stream of (key, count) elements - so from this stream, I want to get a stream of updates of the "key with highest count".

I then key by a constant (keyBy(_ => ()) - as this is a global operation), and use maxBy - and this almost works: I'm getting a stream of highest counts, but the current highest count is emitted for each element.

I think what I'm looking for is some kind of filter-with-previous value, which would only emit elements when the new value is different from the previous.

Is that possible in Flink currently?

Upvotes: 1

Views: 500

Answers (1)

Fabian Hueske
Fabian Hueske

Reputation: 18987

Flink does not feature such a filter by default, but it should be rather easy to implement one yourself.

You can do this with a stateful FlatMap similar to this one:

val source: DataStream[Int] = ???

source
  .keyBy(_: Int => _)
  .timeWindow(Time.minutes(10))
  .fold((1, 0)) { case ((_, current), key) => (key, current + 1) }
  // move everything to the same key
  .keyBy(_ => 0) 
  // use stateful flatmap to remember highest count and filter by that
  .flatMapWithState( (in, state: Option[Int]) => 
    // filter condition
    if (in._2 > state.getOrElse(-1)) 
      // emit new value and update max count
      (Seq(in), Some(in._2)) 
    else 
      // emit nothing (empty Seq()) and keep count
      (Seq(), state)
  ).setParallelism(1)

If the non-parallel (single threaded) filter operator becomes a bottleneck, you can add parallel pre-filter by adding a keyBy with random keys and a stateful filter FlatMap with higher parallelism.

Upvotes: 2

Related Questions