Reputation: 225
I want to use a ProcessWindowFunction
in my Apache Flink project. But I am getting some error when using process function, see below code snippet
The error is:
The method process(ProcessWindowFunction,R,Tuple,TimeWindow>) in the type WindowedStream,Tuple,TimeWindow> is not applicable for the arguments (JDBCExample.MyProcessWindows)
My program:
DataStream<Tuple2<String, JSONObject>> inputStream;
inputStream = env.addSource(new JsonArraySource());
inputStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.process(new MyProcessWindows());
My ProcessWindowFunction
:
private class MyProcessWindows
extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{
public void process(
String key,
Context context,
Iterable<Tuple2<String, JSONObject>> input,
Collector<Tuple2<String, String>> out) throws Exception
{
...
}
}
Upvotes: 3
Views: 1247
Reputation: 9290
What Fabian said :) Using Tuple
should work, but does involve some ugly type casts in your ProcessWindowFunction
. Using a KeySelector
is easy and results in cleaner code. E.g.
.keyBy(new KeySelector<Tuple2<String,JsonObject>, String>() {
@Override
public String getKey(Tuple2<String, JsonObject> in) throws Exception {
return in.f0;
}
})
The above then lets you define a ProcessWindowFunction
like:
public class MyProcessWindows extends ProcessWindowFunction<Tuple2<String, JsonObject>, Tuple2<String, String>, String, TimeWindow> {
Upvotes: 3
Reputation: 18997
The problem are probably the generic types of the ProcessWindowFunction
.
You are referencing the key by position (keyBy(0)
). Therefore, the compiler cannot infer its type (String
) and you need to change the ProcessWindowFunction
to:
private class MyProcessWindows
extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, Tuple, Window>
By replacing String
by Tuple
you have now a generic placeholder for keys that you can cast to Tuple1<String>
when you need to access the key in the processElement()
method:
public void process(
Tuple key,
Context context,
Iterable<Tuple2<String, JSONObject>> input,
Collector<Tuple2<String, String>> out) throws Exception {
String sKey = (String)((Tuple1)key).f0;
...
}
You can avoid the cast and use the proper type if you define a KeySelector<IN, KEY>
function to extract the key, because the return type KEY
of the KeySelector
is known to the compiler.
Upvotes: 5