Reputation: 366
I'm stuck trying to coding custom logic for the Window's apply() method. Basically I want to reduce all elements from a Window, and then appending a timestamp to that value, so I created a WindowedStream from a DataStream, but when I try to define the functions for the apply() it fails at compile time.
This is the code:
class WindowReduceFunction extends ReduceFunction[(Int, String, Int)] {
override def reduce(a: (Int, String, Int), b: (Int, String, Int)) : (Int, String, Int) = {
(a._1, a._2, a._3 + b._3)
}
}
class WindowTimestampAddFunction extends WindowFunction[(Int, String, Int), (Int, String, Int, Long), (Int, String), TimeWindow] {
override def apply(key : (Int, String), window : Window, in: Iterable[(Int, String, Int)], out: Collector[(Int, String, Int, Long)]) {
for(row <- in) {
out.collect((row._1, row._2, row._3, window.maxTimestamp()))
}
}
}
The DataStream is of type [Int, String, Int] and the key is [Int, String]. The code without apply() runs and compile without errors, but is when I type:
myWindowedStream.apply(new WindowReduceFunction(), new WindowTimestampAddFunction())
When it fails and can't compile, giving the error:
[ERROR] [R](preAggregator: ((Int, String, Int), (Int, String, Int)) => (Int, String, Int), windowFunction: (org.apache.flink.api.java.tuple.Tuple, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(Int, String, Int)], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$6: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>
[ERROR] [R](preAggregator: org.apache.flink.api.common.functions.ReduceFunction[(Int, String, Int)], function: org.apache.flink.streaming.api.scala.function.WindowFunction[(Int, String, Int),R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$5: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
[ERROR] cannot be applied to (WindowReduceFunction, WindowTimestampAddFunction)
[ERROR] .apply(new WindowReduceFunction(), new WindowTimestampAddFunction())
[ERROR] ^
[ERROR] one error found
Upvotes: 3
Views: 1181
Reputation: 986
You are using either an index position key, as in keyBy(1)
or a field expression key as in keyBy("field")
. This means that the key type of the WindowedStream
is type Tuple
(org.apache.flink.api.java.tuple.Tuple
to be specific).
If you change the third generic argument of your WindowFunction
to Tuple
from (Int, String)
it should work. You can also change your keyBy
call to use a lambda function, then you can get the correct specific key type in your WindowedStream
. For example: keyBy( in => (in._1, in._2)
.
Upvotes: 3