Reputation: 357
I'm trying to apply per window functionality on Flink's DataStream. Following is my code
DataStream<Tuple2<String, String>> data = ...
DataStream<Tuple2<String, String>> freqCityChangeTransactions = data
.keyBy(0)
.timeWindow(Time.seconds(5))
.process(new MyProcessWindowFunction());
Following is my implementation of MyProcessWindowFunction
public static class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, String>, Tuple2<String, String>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Tuple2<String, String>> input,
Collector<Tuple2<String, String>> out) {
// Do something ...
}
}
However, when I try to compile above code via maven, I get the following error
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.6.1:compile (default-compile) on project flink-examples: Compilation failure
[ERROR] /Users/furqan/Workspace/flink/src/main/java/com/baig/bank/Bank.java:[120,13] no suitable method found for process(com.baig.Bank.MyProcessWindowFunction)
[ERROR] method org.apache.flink.streaming.api.datastream.WindowedStream.<R>process(org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,java.lang.String>,R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow>) is not applicable
[ERROR] (cannot infer type-variable(s) R
[ERROR] (argument mismatch; com.baig.Bank.MyProcessWindowFunction cannot be converted to org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,java.lang.String>,R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow>))
[ERROR] method org.apache.flink.streaming.api.datastream.WindowedStream.<R>process(org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,java.lang.String>,R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow>,org.apache.flink.api.common.typeinfo.TypeInformation<R>) is not applicable
[ERROR] (cannot infer type-variable(s) R
[ERROR] (actual and formal argument lists differ in length))
Any idea what I might be doing wrong here? FYI, I'm working with Apache Flink version 1.5.1 and compiling the Java code with maven3 on Mac.
Upvotes: 3
Views: 1531
Reputation: 43454
The problem is that there's a mismatch between the KeySelector used in the keyBy and the key type specified in the ProcessWindowFunction. You have specified the key using an index into the Tuple2, and as a consequence, the compiler is unable to infer that the keys will be Strings. In this situation, Flink passes the key as a Tuple.
There are a couple of ways you can fix this. If you leave the keyBy as is, then you'll need to modify the ProcessWindowFunction to use Tuple as the key type, and you'll have to cast the key to a String if you want to use it. Something like ((Tuple1<String>)key).f0
. A better solution would be a use a more explicit key selector, such as keyBy(t -> t.f0)
, so that the keys are known to be Strings at compile time.
Upvotes: 3