Reputation: 1809
I've set up my very first toy Flink, I want to do a very simple thing: continuously read a local file and print the content.
The problem is, each time I update that local file, Flink print all lines, I want it to only print the newly added lines.
Code snippet:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String path = "/home/foobar/input";
TextInputFormat inputFormat = new TextInputFormat(new Path(path));
inputFormat.setCharsetName("UTF-8");
DataStreamSource<String> ds = env.readFile(inputFormat, path,
FileProcessingMode.PROCESS_CONTINUOUSLY, 60000l, BasicTypeInfo.STRING_TYPE_INFO);
ds.print();
env.execute("jobname02");
Does anyone know what am I doing wrong here? Thanks for the help.
Upvotes: 2
Views: 4627
Reputation: 43419
You're not doing anything wrong, this is the documented behavior of the PROCESS_CONTINUOUSLY mode:
If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed.
This mode is more useful when applied to a directory, into which you atomically move files once they have been completely written.
Upvotes: 4