Reputation: 11
I'm trying to write a simple word count program using Apache Flink as I'm learning it.
The problem is that I can't get rid of duplicate key tuples in my results.
Input:
a
aaa
ab
aaa
a
a
Output:
(a,1)
(a,2)
(a,3)
(aaa,1)
(aaa,2)
(ab, 1)
Expected output:
(a,3)
(aaa,2)
(ab, 1)
My code:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("data.in");
DataStream<Tuple2<String, Integer>> counts = text
.map(s -> Tuple2.of(s, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.sum(1);
counts.print();
env.execute();
}
Upvotes: 1
Views: 564
Reputation: 43524
Flink's streaming API isn't designed to produce the result you expect. Instead, the idea behind stream processing is that the input is potentially unbounded -- in other words, the input will arrive continuously, forever. In practice, yes, the input may terminate, but then again, maybe it won't.
Since Flink has no expectation that streaming inputs will ever terminate, it can't be expected to wait until the end to produce results. Instead, Flink's DataStream API is organized around the idea of continuous inputs that produce continuous results. Each new input event may produce an updated result.
There is, however, a way to accomplish what you want, while still using the DataStream API, but it's somewhat complex.
It turns out that when you use Flink with a bounded source of input, like a file, when the end of that bounded input it reached, a signal is sent through the job graph indicating that the end has been reached. You can, in fact, wait for this signal, and only then produce results.
This signal I speak of is actually a watermark whose value is MAX_WATERMARK. So what you can do is to have a ProcessFunction sets an event time timer for some point in the distant future. This timer will only fire when this special watermark comes along. In the meantime, this ProcessFunction should be watching the stream, keeping track of the latest result (for each key) -- which it will only collect to the output when this timer finally fires upon receipt of this extremely large watermark.
Or you could use just Flink's DataSet API, which is organized around batch processing. Then you'll get exactly what you expected.
Upvotes: 3