Ahmad.S
Ahmad.S

Reputation: 809

Apache Flink: executing a program which extends the RichFlatMapFunction on the remote cluster causes error

I have the following code in Apache Flink. It works fine in the local cluster while running it on the remote cluster generates NullPointerException error in line containing the command "stack.push(recordPair);".

Does any one know, what is the reason?

Input dataset is the same for both local and remote cluster.

public static class TC extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
            private static TreeSet<Tuple2<Integer, Integer>> treeSet_duplicate_pair  ;
            private  static HashMap< Integer, Set<Integer>> clusters_duplicate_map ;
            private  static  Stack<Tuple2< Integer,Integer>> stack ;
            public TC(List<Tuple2<Integer, Integer>> duplicatsPairs) {
        ...
                stack = new Stack<Tuple2< Integer,Integer>>();
            }
            @Override
            public void flatMap(Tuple2<Integer, Integer> recordPair, Collector<Tuple2<Integer, Integer>> out) throws Exception {
    if (recordPair!= null)
    {
                stack.push(recordPair);
    ...
    }
    }

Upvotes: 1

Views: 1602

Answers (1)

Till Rohrmann
Till Rohrmann

Reputation: 13346

The problem is that you initialize the stack variable in the constructor of the TC class. This initializes the static variable only for the JVM in which the client program runs. For the local execution this works because the Flink job is executed in the same JVM.

When you run it on the cluster, your TC will be serialized and shipped to the cluster nodes. There the deserialization of the instance does not call again the constructor to initialize stack. In order to make this work, you should move the initialization logic to the open method of the RichFlatMapFunction or use a static intializer. But be aware that all operators which run on the same TaskManager will share the same instance of stack, because it is a class variable.

public static class TC extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
    private static TreeSet<Tuple2<Integer, Integer>> treeSet_duplicate_pair;
    private  static HashMap< Integer, Set<Integer>> clusters_duplicate_map;
    // either use a static initializer
    private  static  Stack<Tuple2< Integer,Integer>> stack = new Stack<Tuple2< Integer,Integer>>();
    public TC(List<Tuple2<Integer, Integer>> duplicatsPairs) {
        ...
    }

    @Override
    public void open(Configuration config) {
        // or initialize stack here, but here you have to synchronize the initialization
        ...
    }

    @Override
    public void flatMap(Tuple2<Integer, Integer> recordPair, Collector<Tuple2<Integer, Integer>> out) throws Exception {
        if (recordPair!= null)
        {
                    stack.push(recordPair);
        ...
        }
    }
}

Upvotes: 3

Related Questions