Reputation: 33
I am checking if Flink Sql Table with kafka connector can perform in EXACTLY_ONCE mode, My way is creating a table, set reasonable checkpoint interval, and use a simple tumble function on an event_time field and last restart my program.
Here is my detail progress:
1: Create a kafka table
CREATE TABLE IF NOT EXISTS LOG_TABLE(
id String,
...
...
event_timestamp timestamp(3), watermark for event_timestamp as ....
)
2: Start my Flink job as follow config
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
environment.getCheckpointConfig().setCheckpointInterval(30000L);
environment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///tmp/checkpoint/"));
environment.setStateBackend(new HashMapStateBackend());
environment.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tableEnvironment = StreamTableEnvironment.create(environment, settings);
tableEnvironment.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true);
tableEnvironment.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "1s");
3:Execute my sql
select tumble_end(event_timestamp, interval '5' minute),
count(1)
from LOG_TABLE
group by tumble(event_timestamp, interval '5' minute)
As we see,tumble window interval is 5 minutes and checkpoint interval is 30 seconds, every tumble window trigger 6 checkpoints.
In this case window state lost:
All the messages were sent in 2 minutes, so after restarting from checkpoint, job output should be 200, but the result was 100 and job lost the first job's state. Is there any mistake in my progress? Please help to check, thanks.
Upvotes: 1
Views: 1485
Reputation: 43454
Restarting a Flink job while preserving exactly-once guarantees requires launching the follow-on job in a special way so that the new job begins by restoring the state from the previous job. (Modifying the checkpoint storage directory, as you've done in step 2, isn't helpful.)
If you are using the SQL Client to launch the job, see Start a SQL Job from a savepoint, which involves doing something like this
SET 'execution.savepoint.path' = '/tmp/flink-savepoints/...';
before launching the query that needs the state to be restored.
If you are using the Table API, then the details depend on how you are launching the job, but you can use the command line with something like this
$ ./bin/flink run \
--detached \
--fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
./examples/streaming/StateMachineExample.jar
or you might be using the REST API, in which case you will POST to /jars/:jarid/run
with a savepointPath
configured.
Note that you can use a retained checkpoint rather than a savepoint for restarting or rescaling your jobs. Also note that if you change the query in ways that render the old state incompatible with the new query, then none of this is going to work. See FLIP-190 for more on that topic.
The Flink Operations Playground is a tutorial that covers this and related topics in more detail.
Upvotes: 2