Reputation: 2157
I am trying to achieve an Exactly-Once
semantics in Flink-Kafka
integration. I have my producer module as below:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(1000)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) //Gap after which next checkpoint can be written.
env.getCheckpointConfig.setCheckpointTimeout(4000) //Checkpoints have to complete within 4secs
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //Only 1 checkpoints can be executed at a time
env.getCheckpointConfig.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //Checkpoints are retained if the job is cancelled explicitly
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10)) //Number of restart attempts, Delay in each restart
val myProducer = new FlinkKafkaProducer[String](
"topic_name", // target topic
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), // serialization schema
getProperties(), // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE) //Producer Config
Consumer Module:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
val consumer = new FlinkKafkaConsumer[String]("topic_name", new SimpleStringSchema(), properties)
I am generating few records and pushing it to this producer. The records are in below fashion:
1
2
3
4
5
6
..
..
and so on. So suppose while pushing this data, the producer was able to push the data till the 4th record and due to some failure it went down so when it is up and running again, will it push the record from 5th onwards? Are my properties enough for that?
I will be adding one property on the consumer side as per this link mentioned by the first user. Should I add Idempotent
property on the producer side as well?
My Flink
version is 1.13.5
, Scala 2.11.12
and I am using Flink Kafka connector 2.11
.
I think I am not able to commit the transactions using the EXACTLY_ONCE
because checkpoints are not written at the mentioned path. Attaching screenshots of the Web UI:
Do I need to set any property for that?
Upvotes: 2
Views: 3327
Reputation: 56
For the consumer side, Flink Kafka Consumer bookkeeps the current offset in the distributed checkpoint, and if the consumer task failed, it will restart from the latest checkpoint and re-emit from the offset recorded in the checkpoint. For example, suppose the latest checkpoint records offset 3, and after that flink continue to emit 4, 5 and then failover, then Flink would continue to emit records from 4. Notes that this would not cause duplication since the state of all the operators are also fallback to the state after processed records 3.
For the producer side, Flink uses two-phase commit [1] to achieve exactly-once. Roughly Flink Producer would rely on Kafka's transaction to write data, and only commit data formally after the transaction is committed. Users could use Semantics.EXACTLY_ONCE
to enable this functionality.
[1] https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
Upvotes: 3