Alexander Olsson
Alexander Olsson

Reputation: 21

Does windowAll operator in Flink scale down the parallelization to 1?

I have a stream in Flink which sends cubes from a source, make a transformation on the cube(adding 1 to each element in the cube), then lastly send it downstream to print the throughput of every second.

The stream is parallelized over 4 threads.

If I understand correctly the windowAll operator is a non-parallel transformation and should therefore scale down the parallelization back to 1, and by using it together with TumblingProcessingTimeWindows.of(Time.seconds(1)), sum the throughput of all parallelized subtasks within the latest second and print it. I'm unsure if I get correct output since the throughput every second is printed like this:

1> 25
2> 226
3> 354
4> 372
1> 382
2> 403
3> 363
...

Question: Does the stream printer print the throughput from each thread(1,2,3 & 4), or is it only that it chooses e.g. thread 3 to print the throughput sum of all the subtasks on?

When I set the parallelism of the environment to 1 in the beginningenv.setParallelism(1), I don't get the "x> " before the throughput, but I seem to get the same(or even better) throughput as when it is set to 4. Like this:

45
429
499
505
1
503
524
530
...

Here is a code-snippet of the program:

imports...

public class StreamingCase {
    public static void main(String[] args) throws Exception {
        int parallelism = 4;

        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setParallelism(parallelism);

        DataStream<Cube> start = env
                .addSource(new CubeSource());

        DataStream<Cube> adder = start
                .map(new MapFunction<Cube, Cube>() {
                    @Override
                    public Cube map(Cube cube) throws Exception {
                        return cube.cubeAdd(1);
                    }
                });

        DataStream<Integer> throughput = ((SingleOutputStreamOperator<Cube>) adder)
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
                .apply(new AllWindowFunction<Cube, Integer, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow tw,
                                      Iterable<Cube> values,
                                      Collector<Integer> out) throws Exception {
                        int sum = 0;
                        for (Cube c : values)
                            sum++;
                        out.collect(sum);
                    }
                });
        throughput.print();
        env.execute("Cube Stream of Sweetness");
    }
}

Upvotes: 2

Views: 1869

Answers (1)

Fabian Hueske
Fabian Hueske

Reputation: 18987

If the parallelism of the environment is set to 3 and you are using a WindowAll operator, only the window operator runs in parallelism 1. The sink will still be running with parallelism 3. Hence, the plan looks as follows:

In_1 -\               /- Out_1
In_2 --- WindowAll_1 --- Out_2
In_3 -/               \- Out_3

The WindowAll operator emits its output to its subsequent tasks using a round-robin strategy. That's the reason for the different threads emitting the result records of program.

When you set the environment parallelism to 1, all operators run with a single task.

Upvotes: 6

Related Questions