slo
slo

Reputation: 33

How to recover Flink Sql jobs from checkpoint?

I am checking if Flink Sql Table with kafka connector can perform in EXACTLY_ONCE mode, My way is creating a table, set reasonable checkpoint interval, and use a simple tumble function on an event_time field and last restart my program.

Here is my detail progress:

1: Create a kafka table

CREATE TABLE IF NOT EXISTS LOG_TABLE(
   id String,
   ...
   ...
   event_timestamp timestamp(3), watermark for event_timestamp as ....
) 

2: Start my Flink job as follow config

StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        environment.getCheckpointConfig().setCheckpointInterval(30000L);
        environment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        environment.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///tmp/checkpoint/"));
        environment.setStateBackend(new HashMapStateBackend());
        environment.setParallelism(1);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

TableEnvironment tableEnvironment = StreamTableEnvironment.create(environment, settings);
tableEnvironment.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true);
        tableEnvironment.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "1s");

3:Execute my sql

select tumble_end(event_timestamp, interval '5' minute), 
       count(1) 
       from LOG_TABLE 
       group by tumble(event_timestamp, interval '5' minute)

As we see,tumble window interval is 5 minutes and checkpoint interval is 30 seconds, every tumble window trigger 6 checkpoints.

In this case window state lost:

  1. 2:00:00 pm, Lunch the job, send 100 message.(Job id is bd208afa6599864831f008d429a527bb, chk1-3 triggered successfully, checkpoint dir created checkpoint files)
  2. 2:01:40 pm, Shutdown my job and modify CheckpointStorage directory to /tmp/checkpoint/bd208afa6599864831f008d429a527bb/chk-3
  3. 2:02:00 pm, Restart job and send another 100 message.

All the messages were sent in 2 minutes, so after restarting from checkpoint, job output should be 200, but the result was 100 and job lost the first job's state. Is there any mistake in my progress? Please help to check, thanks.

Upvotes: 1

Views: 1485

Answers (1)

David Anderson
David Anderson

Reputation: 43454

Restarting a Flink job while preserving exactly-once guarantees requires launching the follow-on job in a special way so that the new job begins by restoring the state from the previous job. (Modifying the checkpoint storage directory, as you've done in step 2, isn't helpful.)

If you are using the SQL Client to launch the job, see Start a SQL Job from a savepoint, which involves doing something like this

SET 'execution.savepoint.path' = '/tmp/flink-savepoints/...';

before launching the query that needs the state to be restored.

If you are using the Table API, then the details depend on how you are launching the job, but you can use the command line with something like this

$ ./bin/flink run \
      --detached \ 
      --fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
      ./examples/streaming/StateMachineExample.jar

or you might be using the REST API, in which case you will POST to /jars/:jarid/run with a savepointPath configured.

Note that you can use a retained checkpoint rather than a savepoint for restarting or rescaling your jobs. Also note that if you change the query in ways that render the old state incompatible with the new query, then none of this is going to work. See FLIP-190 for more on that topic.

The Flink Operations Playground is a tutorial that covers this and related topics in more detail.

Upvotes: 2

Related Questions