Peter Rubi
Peter Rubi

Reputation: 119

Flink apply function on timeWindow

I'm currently doing a Flink project. The main idea of the project is to read a datastream of JSON's (network logs), correlate them, and generate a new JSON, which is a combination of different JSON's information.

At this moment, I'm able to read the JSON's, genereate a KeyedStream (based on the machine that generates the log), and then generate a window stream of 5 seconds.

The next step I want to perform is to use the apply function to the window and combine the information of each JSON. I'm a bit confused of how to do it.

The code I currently have is the following one:

DataStream<Tuple2<String,JSONObject>> MetaAlert = events
                .flatMap(new JSONParser())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .apply(new generateMetaAlert());




public static class generateMetaAlert implements WindowFunction<Tuple2<String,JSONObject>, Tuple2<String,JSONObject>, String, Window> {

        @Override
        public void apply(String arg0, Window arg1, Iterable<Tuple2<String, JSONObject>> arg2,
                Collector<Tuple2<String, JSONObject>> arg3) throws Exception {


        }

The .apply(new generateMetaAlert()) part is complaining with the next error:

The method apply(WindowFunction,R,Tuple,TimeWindow>) in the type WindowedStream,Tuple,TimeWindow> is not applicable for the arguments (MetaAlertGenerator.generateMetaAlert)

Any other code structure proposal distinct that the one I made up?

Thank you in advance for your help

Upvotes: 0

Views: 1188

Answers (1)

user4078581
user4078581

Reputation:

When you apply the keyBy function (without using an anonymous class) the type of the key in your custom WindowFunction (3rd field) should be Tuple because the compiler can not determine the type of your key. This code compiles with no errors (take into account that I tried to fill the blanks with dummy code):

public class Test {

    public Test() {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<String> events = env.readTextFile("datastream.log");

        DataStream<Tuple2<String, JSONObject>> MetaAlert
                = events
                .flatMap(new JSONParser())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .apply(new GenerateMetaAlert());

    }

    public class JSONObject {
    }

    public class JSONParser implements FlatMapFunction<String, Tuple2<String, JSONObject>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, JSONObject>> collector) throws Exception {

        }
    }

    public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow> {
        @Override
        public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {

        }
    }

}

But the most straightforward method is to use an anonymous class so you can keep the String type:

DataStream<Tuple2<String, JSONObject>> MetaAlert
        = events
        .flatMap(new JSONParser())
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .apply(new WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow>() {
            @Override
            public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
                // Your code here
            }
        });

Finally, if you want to keep the class, but you also want to keep the type of you key as it is, you can implement a KeySelector:

public class Test {

    public Test() {

        DataStream<Tuple2<String, JSONObject>> MetaAlert
                = events
                .flatMap(new JSONParser())
                .keyBy(new KeySelector<Tuple2<String,JSONObject>, String>() {
                    @Override
                    public String getKey(Tuple2<String, JSONObject> json) throws Exception {
                        return json.f0;
                    }
                })
                .timeWindow(Time.seconds(5))
                .apply(new GenerateMetaAlert());
    }

    public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> {
        @Override
        public void apply(String key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {

        }
    }

}

Upvotes: 2

Related Questions