gfytd
gfytd

Reputation: 1809

Flink Stream, how to read file incrementally?

I've set up my very first toy Flink, I want to do a very simple thing: continuously read a local file and print the content.

The problem is, each time I update that local file, Flink print all lines, I want it to only print the newly added lines.

Code snippet:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String path = "/home/foobar/input";
TextInputFormat inputFormat = new TextInputFormat(new Path(path));
inputFormat.setCharsetName("UTF-8");
DataStreamSource<String> ds = env.readFile(inputFormat, path,
                FileProcessingMode.PROCESS_CONTINUOUSLY, 60000l, BasicTypeInfo.STRING_TYPE_INFO);
ds.print();
env.execute("jobname02");

Does anyone know what am I doing wrong here? Thanks for the help.

Upvotes: 2

Views: 4627

Answers (1)

David Anderson
David Anderson

Reputation: 43419

You're not doing anything wrong, this is the documented behavior of the PROCESS_CONTINUOUSLY mode:

If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed.

This mode is more useful when applied to a directory, into which you atomically move files once they have been completely written.

Upvotes: 4

Related Questions