Kspace
Kspace

Reputation: 225

Apache Flink: ProcessWindowFunction is not applicable

I want to use a ProcessWindowFunction in my Apache Flink project. But I am getting some error when using process function, see below code snippet

The error is:

The method process(ProcessWindowFunction,R,Tuple,TimeWindow>) in the type WindowedStream,Tuple,TimeWindow> is not applicable for the arguments (JDBCExample.MyProcessWindows)

My program:

DataStream<Tuple2<String, JSONObject>> inputStream;

inputStream = env.addSource(new JsonArraySource());

inputStream.keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .process(new MyProcessWindows());

My ProcessWindowFunction:

private class MyProcessWindows 
  extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{

  public void process(
      String key, 
      Context context, 
      Iterable<Tuple2<String, JSONObject>> input, 
      Collector<Tuple2<String, String>> out) throws Exception 
  {
    ...
  }

}

Upvotes: 3

Views: 1247

Answers (2)

kkrugler
kkrugler

Reputation: 9290

What Fabian said :) Using Tuple should work, but does involve some ugly type casts in your ProcessWindowFunction. Using a KeySelector is easy and results in cleaner code. E.g.

.keyBy(new KeySelector<Tuple2<String,JsonObject>, String>() {

    @Override
    public String getKey(Tuple2<String, JsonObject> in) throws Exception {
        return in.f0;
    }
})

The above then lets you define a ProcessWindowFunction like:

public class MyProcessWindows extends ProcessWindowFunction<Tuple2<String, JsonObject>, Tuple2<String, String>, String, TimeWindow> {

Upvotes: 3

Fabian Hueske
Fabian Hueske

Reputation: 18997

The problem are probably the generic types of the ProcessWindowFunction.

You are referencing the key by position (keyBy(0)). Therefore, the compiler cannot infer its type (String) and you need to change the ProcessWindowFunction to:

private class MyProcessWindows 
    extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, Tuple, Window>

By replacing String by Tuple you have now a generic placeholder for keys that you can cast to Tuple1<String> when you need to access the key in the processElement() method:

public void process(
    Tuple key, 
    Context context, 
    Iterable<Tuple2<String, JSONObject>> input, 
    Collector<Tuple2<String, String>> out) throws Exception {

  String sKey = (String)((Tuple1)key).f0;
  ...
}

You can avoid the cast and use the proper type if you define a KeySelector<IN, KEY> function to extract the key, because the return type KEY of the KeySelector is known to the compiler.

Upvotes: 5

Related Questions