FBryant87
FBryant87

Reputation: 4615

Don't read next Kafka message until current message is processed?

When consuming messages using the Confluent Kafka C# library, I only want to see a new message when I've finished processing the current one (I need to read the same message again if something fails). In other words, I don't want the offset to change until I explicitly tell it to change.

To try achieve this, I deactivate auto-commiting in the config (just like in the examples):

{ "enable.auto.commit", false }
{ "auto.offset.reset", "smallest" }

I then comment out the commit line:

while(true)
{
    if (!consumer.Consume(out Message<string, string> msg, TimeSpan.FromMilliseconds(100)))
    {
        continue;
    }

    //I thought by removing this line, I would keep getting the same message (until I've processed the message and commited the offset)
    //consumer.CommitAsync(msg).Result;
}

My hope was that by not commiting, I would keep getting the same message when calling Consume(), but this isn't the case. Even though I don't commit, the offset keeps changing and I keep getting new messages every consumption.

Please clear up my obvious misunderstanding?

Upvotes: 1

Views: 3268

Answers (1)

Marc Harry
Marc Harry

Reputation: 2430

Calling CommitAsync saves your offset to an offset topic stored by kafka. So if you were to dispose your Consumer and create a new one you would start again from the last committed offset, or if you don't commit the offset you would start from offset 0.

If don't commit the offset but continue using the same consumer in your app then the offset will still be incremented and held in-memory by the consumer.

Although this is documentation for the Java consumer class under the section Offsets and Consumer Position it states in more detail the function of offsets and committing. KafkaConsumer Docs

Upvotes: 3

Related Questions