Reputation: 17
I have tuple (Float, String) of datastream which i would like to sort and pick three largest values for each timewindow (fixed). Windowing of datastream is by processing time and sorting in natural order.
Using Flink 1.0.1, below are my try outs
val topTasks = new mutable.PriorityQueue[(Float, String)](Ordering.Tuple2.reverse) //Ex:(5250, "mytask")
//Get stream and other operations ...
val sortMetricStream = metricStream
.map { metrics =>
topTasks.enqueue(metrics._1, metrics._2)
}
.timeWindowAll(Time.seconds(10))
.reduce({ (topTasks.dequeue()._2, topTasks.dequeue()._2, topTasks.dequeue()._2)
})
and
val sortMetricStream = metricStream
.timeWindowAll(Time.seconds(10))
.partitionByRange(0)
.sortPartition(0, Order.DESCENDING)
in either one sortMetricStream does not give me expected tasks name.
Any help on this would be greatly appreciated.
Upvotes: 2
Views: 2330
Reputation: 62330
Use apply(...)
instead of reduce(...)
(see https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#datastream-transformations)
By using the iterator of WindowFunction#apply()
you can buffer all records of the window internally (for example in a list), then sort (the list), and finally emit your result. You can call Collector#collect()
zero, one, or multiples times.
Upvotes: 2