midnight1247
midnight1247

Reputation: 366

Can't apply() custom functions to a WindowedStream on Flink

I'm stuck trying to coding custom logic for the Window's apply() method. Basically I want to reduce all elements from a Window, and then appending a timestamp to that value, so I created a WindowedStream from a DataStream, but when I try to define the functions for the apply() it fails at compile time.

This is the code:

class WindowReduceFunction extends ReduceFunction[(Int, String, Int)] {
  override def reduce(a: (Int, String, Int), b: (Int, String, Int)) : (Int, String, Int) = {
    (a._1, a._2, a._3 + b._3)
  }
}

class WindowTimestampAddFunction extends WindowFunction[(Int, String, Int), (Int, String, Int, Long), (Int, String), TimeWindow] {
  override def apply(key : (Int, String), window : Window, in: Iterable[(Int, String, Int)], out: Collector[(Int, String, Int, Long)]) {
    for(row <- in) {
      out.collect((row._1, row._2, row._3, window.maxTimestamp()))
    }
  }
}

The DataStream is of type [Int, String, Int] and the key is [Int, String]. The code without apply() runs and compile without errors, but is when I type:

myWindowedStream.apply(new WindowReduceFunction(), new WindowTimestampAddFunction())

When it fails and can't compile, giving the error:

[ERROR]   [R](preAggregator: ((Int, String, Int), (Int, String, Int)) => (Int, String, Int), windowFunction: (org.apache.flink.api.java.tuple.Tuple, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(Int, String, Int)], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$6: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>
[ERROR]   [R](preAggregator: org.apache.flink.api.common.functions.ReduceFunction[(Int, String, Int)], function: org.apache.flink.streaming.api.scala.function.WindowFunction[(Int, String, Int),R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$5: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
[ERROR]  cannot be applied to (WindowReduceFunction, WindowTimestampAddFunction)
[ERROR]       .apply(new WindowReduceFunction(), new WindowTimestampAddFunction())
[ERROR]        ^
[ERROR] one error found

Upvotes: 3

Views: 1181

Answers (1)

aljoscha
aljoscha

Reputation: 986

You are using either an index position key, as in keyBy(1) or a field expression key as in keyBy("field"). This means that the key type of the WindowedStream is type Tuple (org.apache.flink.api.java.tuple.Tuple to be specific).

If you change the third generic argument of your WindowFunction to Tuple from (Int, String) it should work. You can also change your keyBy call to use a lambda function, then you can get the correct specific key type in your WindowedStream. For example: keyBy( in => (in._1, in._2).

Upvotes: 3

Related Questions