harish maiya
harish maiya

Reputation: 17

Flink streaming: Get top n of elements for each timewindow

I have tuple (Float, String) of datastream which i would like to sort and pick three largest values for each timewindow (fixed). Windowing of datastream is by processing time and sorting in natural order.

Using Flink 1.0.1, below are my try outs

          val topTasks = new mutable.PriorityQueue[(Float, String)](Ordering.Tuple2.reverse) //Ex:(5250, "mytask")
          //Get stream and other operations ...
          val sortMetricStream = metricStream
                       .map { metrics =>
                         topTasks.enqueue(metrics._1, metrics._2)
                       }
                       .timeWindowAll(Time.seconds(10))
                       .reduce({ (topTasks.dequeue()._2, topTasks.dequeue()._2, topTasks.dequeue()._2)
                       })

and

    val sortMetricStream = metricStream
                       .timeWindowAll(Time.seconds(10))
                       .partitionByRange(0)
                       .sortPartition(0, Order.DESCENDING)

in either one sortMetricStream does not give me expected tasks name.

Any help on this would be greatly appreciated.

Upvotes: 2

Views: 2330

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

Use apply(...) instead of reduce(...) (see https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#datastream-transformations)

By using the iterator of WindowFunction#apply() you can buffer all records of the window internally (for example in a list), then sort (the list), and finally emit your result. You can call Collector#collect() zero, one, or multiples times.

Upvotes: 2

Related Questions