Reputation: 21
I have a stream in Flink which sends cubes from a source, make a transformation on the cube(adding 1 to each element in the cube), then lastly send it downstream to print the throughput of every second.
The stream is parallelized over 4 threads.
If I understand correctly the windowAll
operator is a non-parallel transformation and should therefore scale down the parallelization back to 1, and by using it together with TumblingProcessingTimeWindows.of(Time.seconds(1))
, sum the throughput of all parallelized subtasks within the latest second and print it. I'm unsure if I get correct output since the throughput every second is printed like this:
1> 25
2> 226
3> 354
4> 372
1> 382
2> 403
3> 363
...
Question: Does the stream printer print the throughput from each thread(1,2,3 & 4), or is it only that it chooses e.g. thread 3 to print the throughput sum of all the subtasks on?
When I set the parallelism of the environment to 1 in the beginningenv.setParallelism(1)
, I don't get the "x> " before the throughput, but I seem to get the same(or even better) throughput as when it is set to 4. Like this:
45
429
499
505
1
503
524
530
...
Here is a code-snippet of the program:
imports...
public class StreamingCase {
public static void main(String[] args) throws Exception {
int parallelism = 4;
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(parallelism);
DataStream<Cube> start = env
.addSource(new CubeSource());
DataStream<Cube> adder = start
.map(new MapFunction<Cube, Cube>() {
@Override
public Cube map(Cube cube) throws Exception {
return cube.cubeAdd(1);
}
});
DataStream<Integer> throughput = ((SingleOutputStreamOperator<Cube>) adder)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.apply(new AllWindowFunction<Cube, Integer, TimeWindow>() {
@Override
public void apply(TimeWindow tw,
Iterable<Cube> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (Cube c : values)
sum++;
out.collect(sum);
}
});
throughput.print();
env.execute("Cube Stream of Sweetness");
}
}
Upvotes: 2
Views: 1869
Reputation: 18987
If the parallelism of the environment is set to 3 and you are using a WindowAll operator, only the window operator runs in parallelism 1. The sink will still be running with parallelism 3. Hence, the plan looks as follows:
In_1 -\ /- Out_1
In_2 --- WindowAll_1 --- Out_2
In_3 -/ \- Out_3
The WindowAll operator emits its output to its subsequent tasks using a round-robin strategy. That's the reason for the different threads emitting the result records of program.
When you set the environment parallelism to 1, all operators run with a single task.
Upvotes: 6