Reputation: 173
I am trying to learn Apache Flink and use case mentioned below seems simple enough that flink not supporting it? makes me start to think I understand something fundamentally wrong.
Use case is this simple example from flink docs.
Let's say we have these static elements I want to filter out. We also have dynamic words that are streaming continuously.
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env
.fromElements("DROP", "IGNORE")
.keyBy(x -> x);
DataStream<String> streamOfWords = env
.fromElements("Apache", "DROP", "Flink", "IGNORE")
.keyBy(x -> x);
control
.connect(streamOfWords)
.flatMap(new ControlFunction())
.print();
env.execute();
}
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;
@Override
public void open(Configuration config) {
blocked = getRuntimeContext()
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}
It is stated in documentation:
It is important to recognize that you have no control over the order in which the flatMap1 and flatMap2 callbacks are called. ...... In cases where timing and/or ordering matter, you may find it necessary to buffer events in managed Flink state until your application is ready to process them.
How would one go about filtering those words in real time if we don't know the ordering of their consumption?
Am I misunderstanding how and what for one must use the Apache Flink fundamentally?
Should I just go and store my control words in a non-flink API way for example as a local variable and only work with streaming words using flink APIs. Or what is the flink-appropriate way for such a case?
Upvotes: 0
Views: 372
Reputation: 9290
There is no perfect solution for joining two streams in a true streaming environment, because (as noted in docs)
you have no control over the order in which the flatMap1 and flatMap2 callbacks are called
So there is always a tradeoff between completeness/accuracy and latency, which you, as the application developer, need to decide.
The standard solution is to use Flink state (e.g. ListState
) to buffer incoming elements (the streamOfWords
in your example) that don't have a match with the filter state, and a timer that will fire when you've waited "long enough". When the timer fires, you emit all of the buffered elements.
Note that you could also do the above using the Table API, which supports temporal joins.
Upvotes: 2