Mihai Banu
Mihai Banu

Reputation: 73

Flink 1.12.2 modifying savepoint locally - metadata has absolute paths

I'm trying to modify an existing savepoint, created with flink 1.12.2 & ververica 2.4.1, that was saved on S3.

The steps that I took are the following:

  1. Copied the savepoint containing the '_metadata' and savepoint files from S3 to my local machine;
  2. Opened the flink state and read the state of the operator I'm interested in;
  3. Created and amended the dataset that I want to replace the state of that operator with ;
  4. Trying to amend the state with the following code
BootstrapTransformation<AccountRegistrationInformation> transformation = OperatorTransformation
                    .bootstrapWith(accountDataSet)
                    .keyBy(acc -> acc.getBrand() + "-" + acc.getAccountId())
                    .transform(new AccountRegistrationBootstrapper());
Savepoint.load(executionEnvironment, "C:\\flinkState", new MemoryStateBackend())
                    .removeOperator("registration-processor")
                    .withOperator("registration-processor", transformation)
                    .write("C:\\flinkState\\transformed");
            executionEnvironment.execute();

When running the above code, it amends a subset of the dataset and flink throws the following exception.

Caused by: java.io.FileNotFoundException: \<redacted>\savepoint-c680a3-c178150a8b8d\32c44059-1f59-4091-bcb5-3e1efa369ec6 (The system cannot find the path specified)

When inspecting the _metadata, I noticed that it has absolute paths in S3:

s3://<redacted>/savepoint-c680a3-c178150a8b8d/32c44059-1f59-4091-bcb5-3e1efa369ec6

What I want is to save the amended savepoint to my local machine and then move that savepoint over to S3 manually so that flink can start with the amended state.

Can anybody share their experience with this?

Full exception:

10:09:25,169 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] - Initializing heap keyed state backend with stream factory.
10:09:25,170 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder [] - Finished to build heap keyed state-backend.
10:09:25,171 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] - Initializing heap keyed state backend with stream factory.
10:09:25,176 INFO  org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (1 channels)
10:09:25,178 ERROR org.apache.flink.runtime.operators.DataSinkTask              [] - Error in user code: \<redacted>\savepoints\d18b311a-86e8-4406-93b5-f2b398c4257f\savepoint-c680a3-c178150a8b8d\32c44059-1f59-4091-bcb5-3e1efa369ec6 (The system cannot find the path specified):  DataSink (org.apache.flink.state.api.output.FileCopyFunction@da28d03) (1/1)
java.io.FileNotFoundException: \<redacted>\savepoints\d18b311a-86e8-4406-93b5-f2b398c4257f\savepoint-c680a3-c178150a8b8d\32c44059-1f59-4091-bcb5-3e1efa369ec6 (The system cannot find the path specified)
    at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_282]
    at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_282]
    at java.io.FileInputStream.<init>(FileInputStream.java:138) ~[?:1.8.0_282]
    at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-core-1.12.2.jar:1.12.2]
    at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) ~[flink-core-1.12.2.jar:1.12.2]
    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87) ~[flink-core-1.12.2.jar:1.12.2]
    at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:61) ~[flink-state-processor-api_2.11-1.12.2.jar:1.12.2]
    at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:34) ~[flink-state-processor-api_2.11-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:235) [flink-runtime_2.11-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-runtime_2.11-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-runtime_2.11-1.12.2.jar:1.12.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
10:09:25,223 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - DataSink (org.apache.flink.state.api.output.FileCopyFunction@da28d03) (1/1)#0 (d4b998c90a0fc21a64f463b6476e85aa) switched from RUNNING to FAILED.
java.io.FileNotFoundException: \<redacted>\savepoints\d18b311a-86e8-4406-93b5-f2b398c4257f\savepoint-c680a3-c178150a8b8d\32c44059-1f59-4091-bcb5-3e1efa369ec6 (The system cannot find the path specified)
    at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_282]
    at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_282]
    at java.io.FileInputStream.<init>(FileInputStream.java:138) ~[?:1.8.0_282]
    at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-core-1.12.2.jar:1.12.2]
    at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) ~[flink-core-1.12.2.jar:1.12.2]
    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87) ~[flink-core-1.12.2.jar:1.12.2]
    at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:61) ~[flink-state-processor-api_2.11-1.12.2.jar:1.12.2]
    at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:34) ~[flink-state-processor-api_2.11-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:235) ~[flink-runtime_2.11-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-runtime_2.11-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-runtime_2.11-1.12.2.jar:1.12.2]
10:09:25,224 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for DataSink (org.apache.flink.state.api.output.FileCopyFunction@da28d03) (1/1)#0 (d4b998c90a0fc21a64f463b6476e85aa).
10:09:25,255 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - MapPartition (2861c3d1e95af557df2962264aaf94ef) (6/8)#0 (ab4fcd08aa51c77eec1ac6d3c9fba2d3) switched from RUNNING to FINISHED.
10:09:25,255 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for MapPartition (2861c3d1e95af557df2962264aaf94ef) (6/8)#0 (ab4fcd08aa51c77eec1ac6d3c9fba2d3).
10:09:25,255 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - MapPartition (2861c3d1e95af557df2962264aaf94ef) (8/8)#0 (c0105262e4e271633df686c1b09476a9) switched from RUNNING to FINISHED.
10:09:25,256 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for MapPartition (2861c3d1e95af557df2962264aaf94ef) (8/8)#0 (c0105262e4e271633df686c1b09476a9).

Upvotes: 1

Views: 435

Answers (1)

qinjunjerry
qinjunjerry

Reputation: 94

The absolute path in the _metadata could be a pointer to inline state: i.e., the state stored in _metadata directly. The state stored in data files should have relative paths.

What do you use in 'C:\flinkState' in your code and what do you see '<redacted>' in the FileNotFoundException? If they are sensitive, can you provide an example of them?

Also, did you try on a Linux machine?

Update:

The added stack trace is similar to the one in https://issues.apache.org/jira/browse/FLINK-23429. Could you try to add the state processor API dependence from Flink 1.12.5 in your savepoint transformation job?

Upvotes: 1

Related Questions