Vasif
Vasif

Reputation: 778

How manually read data from Flink's checkpoint file and keep in Java memory

We need to read data from our checkpoints manually for different reasons (let's say we need to change our state object/class structure, so we want to read restore and copy data to a new type of object) But, while we are reading everything is good, when we want to keep/store it in memory and deploying to flink cluster we get empty list/map. in log we see that we are reading and adding all our data properly to list/map but as soon as our method completes it's work we lost data, list/map is empty :(

 val env = ExecutionEnvironment.getExecutionEnvironment();
 val savepoint = Savepoint.load(env, checkpointSavepointLocation, new HashMapStateBackend());
 private List<KeyedAssetTagWithConfig> keyedAssetsTagWithConfigs = new ArrayList<>();

 val keyedStateReaderFunction = new KeyedStateReaderFunctionImpl();

 savepoint.readKeyedState("my-uuid", keyedStateReaderFunction)
            .setParallelism(1)
            .output(new MyLocalCollectionOutputFormat<>(keyedAssetsTagWithConfigs));

 env.execute("MyJobName");

private static class KeyedStateReaderFunctionImpl extends KeyedStateReaderFunction<String, KeyedAssetTagWithConfig> {

    private MapState<String, KeyedAssetTagWithConfig> liveTagsValues;
    private Map<String, KeyedAssetTagWithConfig> keyToValues = new ConcurrentHashMap<>();

    @Override
    public void open(final Configuration parameters) throws Exception {
        liveTagsValues = getRuntimeContext().getMapState(ExpressionsProcessor.liveTagsValuesStateDescriptor);
    }

    @Override
    public void readKey(final String key, final Context ctx, final Collector<KeyedAssetTagWithConfig> out) throws Exception {
        liveTagsValues.iterator().forEachRemaining(entry -> {
            keyToValues.put(entry.getKey(), entry.getValue());
        log.info("key {} -> {} val", entry.getKey(), entry.getValue());
            out.collect(entry.getValue());
        });
    }

    public Map<String, KeyedAssetTagWithConfig> getKeyToValues() {
        return keyToValues;
    }
}

as soon as this code executes I expect having all values inside map which we get from keyedStateReaderFunction.getKeyToValues(). But it returns empty map. However, I see in logs we are reading all of them properly. Even data empty inside keyedAssetsTagWithConfigs list where we are reading output in it.

If anyone has any idea will be very helpful because I get lost, I never had such experience that I put data to map and then I lose it :) When I serialize and write my map or list to text file and then deserialize it from there (using jackson) I see my data exists, but this is not a solution, kind of "workaround"

Thanks in advance

Upvotes: 1

Views: 1024

Answers (3)

Vasif
Vasif

Reputation: 778

I found a solution, started job in attached mode and collecting results in main thread

    val env = ExecutionEnvironment.getExecutionEnvironment();

    val configuration = env.getConfiguration();
    configuration
        .setBoolean(DeploymentOptions.ATTACHED, true);

...

val myresults = dataSource.collect();

Hope will help somebody else because I wasted couple of days while trying to find a soltion.

Upvotes: 0

David Anderson
David Anderson

Reputation: 43499

You have an instance of KeyedStateReaderFunctionImpl in the Flink client which gets serialized and sent to each task manager. Each task manager then deserializes a copy of that KeyedStateReaderFunctionImpl and calls its open and readKey methods, and gradually builds up a private Map containing its share of the data extracted from the savepoint/checkpoint.

Meanwhile the original KeyedStateReaderFunctionImpl back in the Flink client has never had its open or readKey methods called, and doesn't hold any data.

In your case the parallelism is one, so there is only one task manager, but in general you will need collect the output from each task manager and assemble together the complete results from these pieces. These results are not available in the flink client process because the work hasn't been done there.

Upvotes: 1

Gerard Garcia
Gerard Garcia

Reputation: 1856

The code you show creates and submits a Flink job to be executed in its own environment orchestrated by the Flink framework: https://nightlies.apache.org/flink/flink-docs-stable/docs/concepts/flink-architecture/#flink-application-execution

The job runs independently than the code that builds and submits the Flink job so when you call keyedStateReaderFunction.getKeyToValues(), you are calling the method of the object that was used to build the job, not the actual object that was run in the Flink execution environment.

Your workaround seems like a valid option to me. You can then submit the file with your savepoint contents to your new job to recreate its state as you'd like.

Upvotes: 1

Related Questions