Reputation: 948
Let's say you are working on a big flink project. And also you are keyBy
the client ip addresses of your customers.
And realized that you are going to filter the same things in the different code places like that:
public void calculationOne(){
kafkaSource.filter(isContainsSmthA).keyBy(clientip).process(processA).sink(...);
}
public void calculationTwo(){
kafkaSource.filter(isContainsSmthA).keyBy(clientip).process(processB).sink(...);
}
And assumed that they are many kafkaSource.filter(isContainsSmthA)..
Now this structure leads to performance issue in the flink?
If I did something like the below, would be much better?
public Stream filteredA(){
return kafkaSource.filter(isContainsSmthA);
public void calculationOne(){
filteredA().keyBy(clientip).process(processA).sink(...);
}
public void calculationTwo(){
filteredA().keyBy(clientip).process(processB).sink(...);
}
Upvotes: 0
Views: 674
Reputation: 9265
I might do something like below, where a simple wrapper operator can run data through two different functions, and generate two side outputs.
SingleOutputStreamOperator comboResults = kafkaSource
.filter(isContainsSmthA)
.keyBy(clientip)
.process(new MyWrapperFunction(processA, processB));
comboResults
.getSideOutput(processATag)
.sink(...);
comboResults
.getSideOutput(processBTag)
.sink(...);
Though I don't know how that compares with what Arvid suggested.
Upvotes: 0
Reputation: 3634
It depends a bit on how it should behave operationally.
The first way is a more friendly to the Kafka cluster: all records are read once. The filter itself is a very cheap operation, so you don't need to worry to much about it. However, the big downside of this approach is that if one calculations is much slower than the others, it will slow them down. If you do not process historic events, it shouldn't matter as you'd size your application cluster to keep up with all events anyways. Another current downside is that if you have a failure in calculationTwo
also tasks in calculationOne
are restarted. The community is actively working to mitigate that though.
The second way would allow only the affected source -> ... -> sink subtopology to be restarted. So if you expect frequent restarts or need to guarantee certain SLAs, this approach is better. An extension is to actually have separate Flink applications for each of these pipelines. You can share the same jar, but use different arguments to select the correct pipeline on submission. This approach also makes updating of applications much easier as you would only experience downtime for the pipeline that you actually modify.
Upvotes: 1