Reputation: 41
I'm reading a single CSV file from S3 using the DataStream readFile
API:
RowCsvInputFormat inputFormat = new RowCsvInputFormat(...);
SingleOutputStreamOperator<Row> ds = env.readFile(
inputFormat,
"s3://bucket/somefile.csv",
FileProcessingMode.PROCESS_CONTINUOUSLY,
60000L)
This works as expected when the app is first run. Once the file has been fully read, the operator stays in the RUNNING
state since processing mode is PROCESS_CONTINUOUSLY
(as expected). My understanding is that Flink will not re-process this file as long as it hasn't been modified since (docs).
However, when I generate a savepoint (after the file has been fully read) and restart the app by restoring from that savepoint, the app will restart processing that file. Is this expected? I'm running Flink 1.13.2.
For context: I'm using a file to bootstrap my application with historical data. I'd like to use the PROCESS_CONTINUOUSLY
mode since the PROCESS_ONCE
mode will switch the operator to FINISHED
once the file is read and prevents the generation of savepoints. https://issues.apache.org/jira/browse/FLINK-2491
Upvotes: 1
Views: 288
Reputation: 41
It seems like this works if using the directory instead of passing in a file for the path. When updating to pass in the directory, Flink does not re-process the file on restart if it hasn't been modified.
Upvotes: 1