Reputation: 73
I'm trying to modify an existing savepoint, created with flink 1.12.2 & ververica 2.4.1, that was saved on S3.
The steps that I took are the following:
BootstrapTransformation<AccountRegistrationInformation> transformation = OperatorTransformation
.bootstrapWith(accountDataSet)
.keyBy(acc -> acc.getBrand() + "-" + acc.getAccountId())
.transform(new AccountRegistrationBootstrapper());
Savepoint.load(executionEnvironment, "C:\\flinkState", new MemoryStateBackend())
.removeOperator("registration-processor")
.withOperator("registration-processor", transformation)
.write("C:\\flinkState\\transformed");
executionEnvironment.execute();
When running the above code, it amends a subset of the dataset and flink throws the following exception.
Caused by: java.io.FileNotFoundException: \<redacted>\savepoint-c680a3-c178150a8b8d\32c44059-1f59-4091-bcb5-3e1efa369ec6 (The system cannot find the path specified)
When inspecting the _metadata, I noticed that it has absolute paths in S3:
s3://<redacted>/savepoint-c680a3-c178150a8b8d/32c44059-1f59-4091-bcb5-3e1efa369ec6
What I want is to save the amended savepoint to my local machine and then move that savepoint over to S3 manually so that flink can start with the amended state.
Can anybody share their experience with this?
Full exception:
10:09:25,169 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - Initializing heap keyed state backend with stream factory.
10:09:25,170 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder [] - Finished to build heap keyed state-backend.
10:09:25,171 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - Initializing heap keyed state backend with stream factory.
10:09:25,176 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (1 channels)
10:09:25,178 ERROR org.apache.flink.runtime.operators.DataSinkTask [] - Error in user code: \<redacted>\savepoints\d18b311a-86e8-4406-93b5-f2b398c4257f\savepoint-c680a3-c178150a8b8d\32c44059-1f59-4091-bcb5-3e1efa369ec6 (The system cannot find the path specified): DataSink (org.apache.flink.state.api.output.FileCopyFunction@da28d03) (1/1)
java.io.FileNotFoundException: \<redacted>\savepoints\d18b311a-86e8-4406-93b5-f2b398c4257f\savepoint-c680a3-c178150a8b8d\32c44059-1f59-4091-bcb5-3e1efa369ec6 (The system cannot find the path specified)
at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_282]
at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_282]
at java.io.FileInputStream.<init>(FileInputStream.java:138) ~[?:1.8.0_282]
at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:61) ~[flink-state-processor-api_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:34) ~[flink-state-processor-api_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:235) [flink-runtime_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-runtime_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-runtime_2.11-1.12.2.jar:1.12.2]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
10:09:25,223 WARN org.apache.flink.runtime.taskmanager.Task [] - DataSink (org.apache.flink.state.api.output.FileCopyFunction@da28d03) (1/1)#0 (d4b998c90a0fc21a64f463b6476e85aa) switched from RUNNING to FAILED.
java.io.FileNotFoundException: \<redacted>\savepoints\d18b311a-86e8-4406-93b5-f2b398c4257f\savepoint-c680a3-c178150a8b8d\32c44059-1f59-4091-bcb5-3e1efa369ec6 (The system cannot find the path specified)
at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_282]
at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_282]
at java.io.FileInputStream.<init>(FileInputStream.java:138) ~[?:1.8.0_282]
at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:61) ~[flink-state-processor-api_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:34) ~[flink-state-processor-api_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:235) ~[flink-runtime_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-runtime_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-runtime_2.11-1.12.2.jar:1.12.2]
10:09:25,224 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for DataSink (org.apache.flink.state.api.output.FileCopyFunction@da28d03) (1/1)#0 (d4b998c90a0fc21a64f463b6476e85aa).
10:09:25,255 INFO org.apache.flink.runtime.taskmanager.Task [] - MapPartition (2861c3d1e95af557df2962264aaf94ef) (6/8)#0 (ab4fcd08aa51c77eec1ac6d3c9fba2d3) switched from RUNNING to FINISHED.
10:09:25,255 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for MapPartition (2861c3d1e95af557df2962264aaf94ef) (6/8)#0 (ab4fcd08aa51c77eec1ac6d3c9fba2d3).
10:09:25,255 INFO org.apache.flink.runtime.taskmanager.Task [] - MapPartition (2861c3d1e95af557df2962264aaf94ef) (8/8)#0 (c0105262e4e271633df686c1b09476a9) switched from RUNNING to FINISHED.
10:09:25,256 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for MapPartition (2861c3d1e95af557df2962264aaf94ef) (8/8)#0 (c0105262e4e271633df686c1b09476a9).
Upvotes: 1
Views: 435
Reputation: 94
The absolute path in the _metadata could be a pointer to inline state: i.e., the state stored in _metadata directly. The state stored in data files should have relative paths.
What do you use in 'C:\flinkState' in your code and what do you see '<redacted>' in the FileNotFoundException? If they are sensitive, can you provide an example of them?
Also, did you try on a Linux machine?
Update:
The added stack trace is similar to the one in https://issues.apache.org/jira/browse/FLINK-23429. Could you try to add the state processor API dependence from Flink 1.12.5 in your savepoint transformation job?
Upvotes: 1