Piyush Shrivastava
Piyush Shrivastava

Reputation: 1098

Kafka consuming the latest message again when I rerun the Flink consumer

I have created a Kafka consumer in Apache Flink API written in Scala. Whenever I pass some messages from a topic, it duly is receiving them. However, when I restart the consumer, instead of receiving the new or unconsumed messages, it consumes the latest message that was sent to that topic.

Here's what I am doing:

  1. Running the producer:

    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
    
  2. Running the consumer:

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val st = env
        .addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties))
    env.enableCheckpointing(5000)
    st.print()
    env.execute()
    
  3. Passing some messages

  4. Stopping the consumer
  5. Running the consumer again prints the last message I sent. I want it to print only new messages.

Upvotes: 3

Views: 3316

Answers (1)

Robert Metzger
Robert Metzger

Reputation: 4542

You are running a Kafka consumer with a checkpoint interval of 5 seconds. So every 5 seconds, Flink is creating a copy of your operator's state (the offsets) for recovery.

Once the checkpoint is completed, it will let the operator know that the checkpoint is finished. On that notification, the Kafka consumer commits the offsets to Zookeeper. So roughly every 5 seconds, we are writing the offsets of the last checkpoint into ZK.

When you start the Flink job again, it will find the offsets in ZK and go on from there. Depending on the timing, all messages received after the commit to ZK will be send again.

You can not avoid this behavior because the .print() "operator" is not part of the checkpointing. Its meant as a debugging utility. However a data sink which participates in the checkpointing (for example the rolling file sink) will ensure that no duplicates are written to the file system.

Upvotes: 6

Related Questions