Reputation: 809
I need to set a list of values in my program and access them in all of the task managers. Currently, I declared a public field in my main class and set the values. Later in my program, which will be run in a remote cluster, I would like to have access to this variables in all task managers. Here is my sample code. It seems there is a problem however: there is not any compile or run-time error, the values are not available to the task managers.
public class myMainClass {
public static ArrayList<String> mykey = new ArrayList<String>();
public static void main(String[] args) throws Exception {
// assign value to the variable
partitionedData = partitionedData.partitionCustom(new MyPartitioner(myKey), 2);
}
}
public static class MyPartitioner implements Partitioner<String> {
public String [] partitionKeys;
public static ArrayList<String> mykey;
public MyPartitioner(ArrayList<String> mykey) {
this.mykey = mykey;
}
@Override
public int partition(String key, int numPartitions) {
for (int i=0 ; i< numParalell-1 ; i++) {
if(mykey.get(i).compareToIgnoreCase(key) > 0)
return i;
}
return numParalell-1 ;
}
}
Upvotes: 3
Views: 2998
Reputation: 62330
I am not sure what you want to accomplish. If you want to pre-compute a (non-changing) value and distribute it to all task managers (I assume you need access those value in some operators), you can simple give those value via constructor parameters to your UDFs or use Flink's broadcast variables: https://ci.apache.org/projects/flink/flink-docs-release-0.8/programming_guide.html#broadcast-variables
Upvotes: 1
Reputation: 4542
I would pass the mykey
list as an constructor argument to the MyPartitioner
class.
Your code would look like this:
public class myMainClass {
public static void main(String[] args) throws Exception {
ArrayList<String> mykey = new ArrayList<String>();
// assign value to the vaiable
partitionedData = partitionedData.partitionCustom(new MyPartitioner(mykey), 2);
}
}
public static class MyPartitioner implements Partitioner<String> {
private final ArrayList<String> mykey;
public String [] partitionKeys;
public MyPartitioner(ArrayList<String> mykey) {
this.mykey = mykey;
}
@Override
public int partition(String key, int numPartitions) {
for (int i=0 ; i< numParalell-1 ; i++) {
if(mykey.get(i).compareToIgnoreCase(key) > 0)
return i;
}
return numParalell-1 ;
}
}
Upvotes: 2