Reputation: 60
I am getting an Exception in thread "main" java.lang.IllegalAccessError: class org.apache.flink.state.api.runtime.SavepointLoader tried to access protected method org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(Ljava/lang/String;)Lorg/apache/flink/runtime/state/CompletedCheckpointStorageLocation; (org.apache.flink.state.api.runtime.SavepointLoader and org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage are in unnamed module of loader 'app')
Using flink 1.8. Using below maven repo :
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api_2.12</artifactId>
<version>1.9.1</version>
</dependency>
Source code snippet
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint savepoint = Savepoint.load(bEnv, "/home/utlesh/Documents/savepoint", new MemoryStateBackend()) ;
savepoint.readListState("input-events-source-01", "Custom Source", TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>(){}));
Getting exception on second line which calls below function
public static ExistingSavepoint load(ExecutionEnvironment env, String path, StateBackend stateBackend) throws IOException {
org.apache.flink.runtime.checkpoint.savepoint.Savepoint savepoint = SavepointLoader.loadSavepoint(path);
...
...
}
Which calls below function :
package org.apache.flink.state.api.runtime;
public static Savepoint loadSavepoint(String savepointPath) throws IOException {
CompletedCheckpointStorageLocation location = AbstractFsCheckpointStorage
.resolveCheckpointPointer(savepointPath);
try (DataInputStream stream = new DataInputStream(location.getMetadataHandle().openInputStream())) {
return Checkpoints.loadCheckpointMetadata(stream, Thread.currentThread().getContextClassLoader());
}
}
which calls below function :
package org.apache.flink.runtime.state.filesystem;
protected static CompletedCheckpointStorageLocation resolveCheckpointPointer(String checkpointPointer) throws IOException {
checkNotNull(checkpointPointer, "checkpointPointer");
checkArgument(!checkpointPointer.isEmpty(), "empty checkpoint pointer");
...
...
}
If we see carefully, protected function of different package is called here. Is this a bug in flink maven repo or it's me using it wrong way ? Is there any other way to deserialize or read flink savepoint and checkpoint ?
Upvotes: 1
Views: 650
Reputation: 43499
The State Processor API can only be used in batch jobs running Flink 1.9 or greater, but it can be used to read savepoints and checkpoints that were written by streaming jobs running older versions of Flink (back to Flink 1.6).
Upvotes: 1
Reputation: 193
There seems to be an dependency version mismatch for your flink.
Add the below dependencies to the pom.xml and build again, also remove the old version dependency of the flink-clients from same file.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.9.1</version>
</dependency>
Upvotes: 1