UberHans
UberHans

Reputation: 797

Flink application is not receiving and processing the events from Kinesis connector generated when it was down

The problem: Flink application is not receiving and processing the events from Kinesis connector generated when it was down ( due to restart)

We have the following Flink env setting

env.enableCheckpointing(1000ms);
env.setStateBackend(new RocksDBStateBackend("file:///<filelocation>", true));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(pause); 
env.getCheckpointConfig().setCheckpointTimeout(timeOut); 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(concurrency);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

and Kinesis has following initial configuration

kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
                "LATEST");

Intrestingly when I change the Kinesis configuration to reply the event i.e.

 kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
                "TRIM_HORIZON");

Flink is receiving all the buffered records (this includes those events generated before, during and after event Flink application was down) from Kinesis and processing it. Thus this behavior violates "Exactly once" property of the Flink application.

Can someone point out some obvious things I am missing?

Upvotes: 0

Views: 839

Answers (2)

Shankar
Shankar

Reputation: 2825

The previous answer is a good option, if you want to use checkpointing to track your consumer's popsition in the stream.

Here is an alternative with even more control. You can try using AT_TIMESTAMP as the STREAM_INITIAL_POSITION configuration option in your Flink Kinesis Connector.

This setting will need a configuration option STREAM_INITIAL_TIMESTAMP, which is the timestamp after which you need to read the messages from Kinesis.

The timestamp value can be maintained in several ways - a sink to update a text file, a sink to update in an external DB like DynamoDB, manually provided by the start up script, etc.

When the Flink application is restarted, provide the last processed timestamp as a runtime parameter and use it in the Kinesis consumer's configuration.

Your configuration will look like this:

Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
Double startTimeStamp = 1459799926.480; //Parameterize this!
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, startTimeStamp);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));

Upvotes: 0

Gordon Tai
Gordon Tai

Reputation: 41

The Flink Kinesis connector does store the shard sequence numbers in the state for exactly-once processing.

From your description, it seems like on your job "restart", the checkpointed state is not respected.

Just to first eliminate the obvious: How is your job resuming from the restart? Are you resuming from a savepoint, or is this restart automatically done from a previous checkpoint?

Upvotes: 2

Related Questions