D P
D P

Reputation: 41

How does Flink's readFile API maintain state?

I'm reading a single CSV file from S3 using the DataStream readFile API:

    RowCsvInputFormat inputFormat = new RowCsvInputFormat(...);
    SingleOutputStreamOperator<Row> ds = env.readFile(
      inputFormat,
      "s3://bucket/somefile.csv",
      FileProcessingMode.PROCESS_CONTINUOUSLY,
      60000L)

This works as expected when the app is first run. Once the file has been fully read, the operator stays in the RUNNING state since processing mode is PROCESS_CONTINUOUSLY (as expected). My understanding is that Flink will not re-process this file as long as it hasn't been modified since (docs).

However, when I generate a savepoint (after the file has been fully read) and restart the app by restoring from that savepoint, the app will restart processing that file. Is this expected? I'm running Flink 1.13.2.

For context: I'm using a file to bootstrap my application with historical data. I'd like to use the PROCESS_CONTINUOUSLY mode since the PROCESS_ONCE mode will switch the operator to FINISHED once the file is read and prevents the generation of savepoints. https://issues.apache.org/jira/browse/FLINK-2491

Upvotes: 1

Views: 288

Answers (1)

D P
D P

Reputation: 41

It seems like this works if using the directory instead of passing in a file for the path. When updating to pass in the directory, Flink does not re-process the file on restart if it hasn't been modified.

Upvotes: 1

Related Questions