Owl
Owl

Reputation: 3

Kafka .NET - get all existing records for that specific topic and then carry on listening for new ones

In a basic Kafka .NET consumer setup like this

conf = new ConsumerConfig
{
            GroupId = $"{_topic}{CacheMessageConsumerSuffix}",
            BootstrapServers = Servers,
            AutoOffsetReset = AutoOffsetResetType.Earliest,
};


using (var c = new Consumer<string, string>(conf))
{
    c.Subscribe(_topic);

    bool consuming = true;
    c.OnError += (_, e) => consuming = !e.IsFatal;

    while (consuming)
    {
        var cr = c.Consume();
    }
}

That consumer will, when started, only receive new messages from a Producer. How to first get all existing records for that specific topic and then carry on listening for new ones?

Thanks

Upvotes: 0

Views: 2406

Answers (2)

Owl
Owl

Reputation: 3

I think I worked out how to do it. As suggested in this blog post, http://blog.empeccableweb.com/wp/2016/11/30/manual-offsets-in-kafka-consumers-example/, you can set up an event handler to be triggered when partitions are assigned to a given consumer and then force those partitions back to offset 0. Something like this:

using (var consumer = new Consumer<string,string>(configuration))
{
_consumer.OnPartitionsAssigned += OnPartitionsAssigned;
_consumer.Subscribe("my-topic");
...
}

private void OnPartitionsAssigned(object sender, List<TopicPartition> assignedPartitions)
{
    // Build a new list of Topic/Partition/Offset values with offset = Beginning
    var newPartitionOffsets = new List<TopicPartitionOffset>();
    assignedPartitions.ForEach(x => newPartitionOffsets.Add(new TopicPartitionOffset(x, Offset.Beginning)));

    // Assign to consumer
    consumer.Assign(newPartitionOffsets);
}

Not sure if there is a more clever way but this seems to do the trick. Of course it only works if this event is only triggered immediately after a subscription is started and not during its lifetime. But even that can probably be managed.

Upvotes: 0

OneCricketeer
OneCricketeer

Reputation: 192013

Inside your config, add this after changing your current consumer group value (if you have one)

AutoOffsetReset = AutoOffsetResetType.Earliest

Refer the README

Upvotes: 1

Related Questions