a_confused_student
a_confused_student

Reputation: 404

implementation of RoundRobin partitioning in Apache Flink

Hi I would like to implement a RoundRobin implementation for an operator in ApacheFlink, before I continue I would like to preface that I'm well aware that this already is implemented in Flink but this is just one of the implementations that I would like to make that would be then modified to create w-choices (see https://arxiv.org/pdf/1510.05714.pdf)

Here's a graphical representation of what I'm trying to do where the operation that I'm trying to do is max operation for tuple pairs of <key:String (a letter) ; value: Int>:

enter image description here

Currently what I have to imitate this is the following code:

DataStream<Tuple2<String, Integer>> split = operatorAggregateStream
           .partitionCustom(new RoundRobin(), value->value.f0 )
           .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
           .process(new MaxPartialWindowProcessFunction());

DataStream<Tuple2<String, Integer>> reconciliation = split
           .partitionCustom(new SingleCast(), value->value.f0 )
           .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
           .process(new MaxPartialWindowProcessFunction());

I first partition in a roundRobin fashion then apply a window for the processfunction and do a "partial function application" then afterwards using the singlecast() I converge everything to one partition but this does not seem to work when looking at the output.

here is the code for the roundRobin and singleCast

public class SingleCast implements Partitioner<String> {
    @Override
    public int partition(String key, int numPartitions) {

        return  0;
    }
}
public class RoundRobin implements Partitioner<String> {

    int index;

    RoundRobin() {
        index = 0;
    }

    @Override
    public int partition(String key, int numPartitions) {
        index++;

        return  Math.abs(this.index % numPartitions) ;
    }
}

and the code for the maxPartialFunciton :

public class MaxPartialWindowProcessFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> {

    // Define a ReducingState to store the maximum value seen so far in the window

    private HashMap<String, Integer> map ;

    @Override
    public void open(Configuration parameters) throws Exception {
        map = new HashMap<>();
    }
    
    @Override
    public void process(Context context, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> collector) throws Exception {
        for(Tuple2<String, Integer> value : input){
            String key = value.f0;
            synchronized (map){
                if(!map.containsKey(key)){
                    map.put(key, value.f1);
                }
                else if(value.f1 > map.get(key)){
                    map.put(key, value.f1);
                }
            }
        }

        for (String k : map.keySet()) {
            collector.collect(Tuple2.of(k, map.get(k)));
        }
    }
}

When creating a more basic implementation without windows where the roundRobin and SingleCast work perfectly I have the following:

DataStream<Tuple2<String, Integer>> aggregation = operatorAggregateStream
           .partitionCustom(new RoundRobin(), value->value.f0 ) //any cast
           .process(new MaxPartialFunction());

DataStream<Tuple2<String, Integer>> reconciliation = aggregation
           .partitionCustom(new SingleCast(), value->value.f0 )
           .process(new MaxPartialFunction());

The issue that I have with my window implementation is that it does not seem to partition correctly and only creates three partitions or less. Then when it reconciles it does not reconcile correctly, it takes three different partitions. When using the printing method of flink I obtain this output

split:13> (A,100)
split:15> (C,100)
split:14> (B,100)
reconciliation:2> (B,100)
reconciliation:1> (A,100)
reconciliation:3> (C,100)

ideally, what should be appearing is multiple split partitions (normally by default there should be 16 if I recall correctly) and just one reconciliation partition.

please help me, I'm at my wits end

Upvotes: 1

Views: 112

Answers (1)

kkrugler
kkrugler

Reputation: 9255

Doing a partial/unkeyed max is an interesting optimization, but it's hard. The main issue is that in order for the unkeyed max operator to work properly with windowing, it needs to know about time (so it can flush results when the downstream complete max windowing operator needs them).

But you don't have access to regular Flink timers, because it's an unkeyed stream. So now you're using your own timer service, checking watermarks, etc. Which means you really are going to need to implement a operator at the level above Flink's functions. Which is possible, but non-trivial.

Upvotes: 1

Related Questions