Reputation: 809
I have the following code in Apache Flink. It works fine in the local cluster while running it on the remote cluster generates NullPointerException error in line containing the command "stack.push(recordPair);".
Does any one know, what is the reason?
Input dataset is the same for both local and remote cluster.
public static class TC extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private static TreeSet<Tuple2<Integer, Integer>> treeSet_duplicate_pair ;
private static HashMap< Integer, Set<Integer>> clusters_duplicate_map ;
private static Stack<Tuple2< Integer,Integer>> stack ;
public TC(List<Tuple2<Integer, Integer>> duplicatsPairs) {
...
stack = new Stack<Tuple2< Integer,Integer>>();
}
@Override
public void flatMap(Tuple2<Integer, Integer> recordPair, Collector<Tuple2<Integer, Integer>> out) throws Exception {
if (recordPair!= null)
{
stack.push(recordPair);
...
}
}
Upvotes: 1
Views: 1602
Reputation: 13346
The problem is that you initialize the stack
variable in the constructor of the TC
class. This initializes the static variable only for the JVM in which the client program runs. For the local execution this works because the Flink job is executed in the same JVM.
When you run it on the cluster, your TC
will be serialized and shipped to the cluster nodes. There the deserialization of the instance does not call again the constructor to initialize stack
. In order to make this work, you should move the initialization logic to the open
method of the RichFlatMapFunction
or use a static intializer. But be aware that all operators which run on the same TaskManager
will share the same instance of stack
, because it is a class variable.
public static class TC extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private static TreeSet<Tuple2<Integer, Integer>> treeSet_duplicate_pair;
private static HashMap< Integer, Set<Integer>> clusters_duplicate_map;
// either use a static initializer
private static Stack<Tuple2< Integer,Integer>> stack = new Stack<Tuple2< Integer,Integer>>();
public TC(List<Tuple2<Integer, Integer>> duplicatsPairs) {
...
}
@Override
public void open(Configuration config) {
// or initialize stack here, but here you have to synchronize the initialization
...
}
@Override
public void flatMap(Tuple2<Integer, Integer> recordPair, Collector<Tuple2<Integer, Integer>> out) throws Exception {
if (recordPair!= null)
{
stack.push(recordPair);
...
}
}
}
Upvotes: 3