whatsinthename
whatsinthename

Reputation: 2157

Exactly once in flink kafka producer and consumer

I am trying to achieve an Exactly-Once semantics in Flink-Kafka integration. I have my producer module as below:

val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.enableCheckpointing(1000)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) //Gap after which next checkpoint can be written.
    env.getCheckpointConfig.setCheckpointTimeout(4000) //Checkpoints have to complete within 4secs
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //Only 1 checkpoints can be executed at a time
    env.getCheckpointConfig.enableExternalizedCheckpoints(
      ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //Checkpoints are retained if the job is cancelled explicitly
    //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10)) //Number of restart attempts, Delay in each restart

    val myProducer = new FlinkKafkaProducer[String](
      "topic_name", // target topic
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), // serialization schema
      getProperties(), // producer config
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE) //Producer Config 

Consumer Module:

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")

    val consumer = new FlinkKafkaConsumer[String]("topic_name", new SimpleStringSchema(), properties)
  

I am generating few records and pushing it to this producer. The records are in below fashion:

1  
2  
3  
4  
5  
6  
..  
..  

and so on. So suppose while pushing this data, the producer was able to push the data till the 4th record and due to some failure it went down so when it is up and running again, will it push the record from 5th onwards? Are my properties enough for that?

I will be adding one property on the consumer side as per this link mentioned by the first user. Should I add Idempotent property on the producer side as well?

My Flink version is 1.13.5, Scala 2.11.12 and I am using Flink Kafka connector 2.11.

I think I am not able to commit the transactions using the EXACTLY_ONCE because checkpoints are not written at the mentioned path. Attaching screenshots of the Web UI:

enter image description here

enter image description here

Do I need to set any property for that?

Upvotes: 2

Views: 3327

Answers (1)

Yun Gao
Yun Gao

Reputation: 56

For the consumer side, Flink Kafka Consumer bookkeeps the current offset in the distributed checkpoint, and if the consumer task failed, it will restart from the latest checkpoint and re-emit from the offset recorded in the checkpoint. For example, suppose the latest checkpoint records offset 3, and after that flink continue to emit 4, 5 and then failover, then Flink would continue to emit records from 4. Notes that this would not cause duplication since the state of all the operators are also fallback to the state after processed records 3.

For the producer side, Flink uses two-phase commit [1] to achieve exactly-once. Roughly Flink Producer would rely on Kafka's transaction to write data, and only commit data formally after the transaction is committed. Users could use Semantics.EXACTLY_ONCE to enable this functionality.

[1] https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

[2] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#fault-tolerance

Upvotes: 3

Related Questions