Reputation: 1098
I have created a Kafka consumer in Apache Flink API written in Scala. Whenever I pass some messages from a topic, it duly is receiving them. However, when I restart the consumer, instead of receiving the new or unconsumed messages, it consumes the latest message that was sent to that topic.
Here's what I am doing:
Running the producer:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
Running the consumer:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val st = env
.addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties))
env.enableCheckpointing(5000)
st.print()
env.execute()
Passing some messages
Upvotes: 3
Views: 3316
Reputation: 4542
You are running a Kafka consumer with a checkpoint interval of 5 seconds. So every 5 seconds, Flink is creating a copy of your operator's state (the offsets) for recovery.
Once the checkpoint is completed, it will let the operator know that the checkpoint is finished. On that notification, the Kafka consumer commits the offsets to Zookeeper. So roughly every 5 seconds, we are writing the offsets of the last checkpoint into ZK.
When you start the Flink job again, it will find the offsets in ZK and go on from there. Depending on the timing, all messages received after the commit to ZK will be send again.
You can not avoid this behavior because the .print()
"operator" is not part of the checkpointing. Its meant as a debugging utility.
However a data sink which participates in the checkpointing (for example the rolling file sink) will ensure that no duplicates are written to the file system.
Upvotes: 6