Dhinesh
Dhinesh

Reputation: 325

How to get the latest offset from the Kafka topic in Confluent kafka C# library?

I am using Confluent kafka C# client. how to get the latest offset consumed from a topic in this?

Upvotes: 7

Views: 16221

Answers (4)

hamed nasrollahi
hamed nasrollahi

Reputation: 19

You can use the Position() method from the consumer like:

var partition = 0;
Offset lastOffset = _kafkaConsumer.Position(new TopicPartition("topic", partition));

Upvotes: 0

Tomas Chabada
Tomas Chabada

Reputation: 3019

Instead of retrieving offset information from consumer (I didn't want to consume message first) I was able to read topic offsets (high and low) from producer like this:

var partitionOffset = _producer.QueryWatermarkOffsets(new TopicPartition("myTopic", myPartition), TimeSpan.FromSeconds(10));

Upvotes: 1

Hans Jespersen
Hans Jespersen

Reputation: 8335

When you receive a message it should include the topic, partition, and offset from where it came (in addition to the Key and Value).

From the example here:

consumer.OnMessage += (_, msg)
  => Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} " +
        $"Offset: {msg.Offset} {msg.Value}");

You also get an event when it reaches the end of the each topic partition

consumer.OnPartitionEOF += (_, end)
  => Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}" +
          $" , next message will be at offset {end.Offset}");

Upvotes: 1

Treziac
Treziac

Reputation: 3264

In addition to previous answer, you can use

List<TopicPartitionOffsetError> Position(IEnumerable<TopicPartition> partitions)

It will return the last offset polled from librdkafka for given topic/partitions

You have a similar Committed method, for the latest committed offset from consumer


Also you can query the latest known offsets

WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)

it will send a request to kafka cluster. The call is blocking, set a proper timeout. Currently, you cannot send a request on multiple partition at once. You can use it either to get last known offset, either to compute lag

There is also

WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition)

which will query the internal state in librdkafka, and could return INVALID_OFFSET (-1001). You can use it to detect some lag due to processing the data. (difference between position and result of this method)

Upvotes: 7

Related Questions