Reputation: 1
I have a Flink streaming program that have branch processing logic after a long transformation logic. Will the long transformation logic be executed multiple times? Pseudo code:
env = getEnvironment();
DataStream<Event> inputStream = getInputStream();
tempStream = inputStream.map(very_heavy_computation_func)
output1 = tempStream.map(func1);
output1.addSink(sink1);
output2 = tempStream.map(func2);
output2.addSink(sink2);
env.execute();
Questions:
inputStream.map(very_heavy_computation_func)
be executed?tempStream
(or other method) to avoid the previous transformation being executed multiple times?Upvotes: 0
Views: 150
Reputation: 20043
You can actually answer (1) easily by just trying out more or less exactly your example:
public class TestProgram {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<Integer> stream = env.fromElements(1, 2, 3)
.map(i -> {
System.out.println("Executed expensive computation for: " + i);
return i;
});
stream.map(i -> i).addSink(new PrintSinkFunction<>());
stream.map(i -> i).addSink(new PrintSinkFunction<>());
env.execute();
}
}
produces (on my machine, for example):
Executed expensive computation for: 3
Executed expensive computation for: 1
Executed expensive computation for: 2
9> 3
8> 2
8> 2
9> 3
7> 1
7> 1
You can also find a more technical answer here which explains how records are replicated to downstream operators, rather than running the source/operator multiple times.
Upvotes: 1