Utlesh Singh
Utlesh Singh

Reputation: 60

Getting Exception while inspecting flink savepoint using state processor api

I am getting an Exception in thread "main" java.lang.IllegalAccessError: class org.apache.flink.state.api.runtime.SavepointLoader tried to access protected method org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(Ljava/lang/String;)Lorg/apache/flink/runtime/state/CompletedCheckpointStorageLocation; (org.apache.flink.state.api.runtime.SavepointLoader and org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage are in unnamed module of loader 'app')

Using flink 1.8. Using below maven repo :

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-state-processor-api_2.12</artifactId>
      <version>1.9.1</version>
    </dependency>

Source code snippet

        ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load(bEnv, "/home/utlesh/Documents/savepoint", new MemoryStateBackend()) ;
        savepoint.readListState("input-events-source-01", "Custom Source", TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>(){}));

Getting exception on second line which calls below function

    public static ExistingSavepoint load(ExecutionEnvironment env, String path, StateBackend stateBackend) throws IOException {
    org.apache.flink.runtime.checkpoint.savepoint.Savepoint savepoint = SavepointLoader.loadSavepoint(path);
    ...
    ...
}

Which calls below function :

    package org.apache.flink.state.api.runtime;

    public static Savepoint loadSavepoint(String savepointPath) throws IOException {
        CompletedCheckpointStorageLocation location = AbstractFsCheckpointStorage
            .resolveCheckpointPointer(savepointPath);

        try (DataInputStream stream = new DataInputStream(location.getMetadataHandle().openInputStream())) {
            return Checkpoints.loadCheckpointMetadata(stream, Thread.currentThread().getContextClassLoader());
        }
    }

which calls below function :

    package org.apache.flink.runtime.state.filesystem;

    protected static CompletedCheckpointStorageLocation resolveCheckpointPointer(String checkpointPointer) throws IOException {
        checkNotNull(checkpointPointer, "checkpointPointer");
        checkArgument(!checkpointPointer.isEmpty(), "empty checkpoint pointer");
       ...
       ...
}

If we see carefully, protected function of different package is called here. Is this a bug in flink maven repo or it's me using it wrong way ? Is there any other way to deserialize or read flink savepoint and checkpoint ?

Upvotes: 1

Views: 650

Answers (2)

David Anderson
David Anderson

Reputation: 43499

The State Processor API can only be used in batch jobs running Flink 1.9 or greater, but it can be used to read savepoints and checkpoints that were written by streaming jobs running older versions of Flink (back to Flink 1.6).

Upvotes: 1

Anurag Anand
Anurag Anand

Reputation: 193

There seems to be an dependency version mismatch for your flink.

Add the below dependencies to the pom.xml and build again, also remove the old version dependency of the flink-clients from same file.

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.11</artifactId>
      <version>1.9.1</version>
</dependency>

Upvotes: 1

Related Questions