Reputation: 3
In a basic Kafka .NET consumer setup like this
conf = new ConsumerConfig
{
GroupId = $"{_topic}{CacheMessageConsumerSuffix}",
BootstrapServers = Servers,
AutoOffsetReset = AutoOffsetResetType.Earliest,
};
using (var c = new Consumer<string, string>(conf))
{
c.Subscribe(_topic);
bool consuming = true;
c.OnError += (_, e) => consuming = !e.IsFatal;
while (consuming)
{
var cr = c.Consume();
}
}
That consumer will, when started, only receive new messages from a Producer. How to first get all existing records for that specific topic and then carry on listening for new ones?
Thanks
Upvotes: 0
Views: 2406
Reputation: 3
I think I worked out how to do it. As suggested in this blog post, http://blog.empeccableweb.com/wp/2016/11/30/manual-offsets-in-kafka-consumers-example/, you can set up an event handler to be triggered when partitions are assigned to a given consumer and then force those partitions back to offset 0. Something like this:
using (var consumer = new Consumer<string,string>(configuration))
{
_consumer.OnPartitionsAssigned += OnPartitionsAssigned;
_consumer.Subscribe("my-topic");
...
}
private void OnPartitionsAssigned(object sender, List<TopicPartition> assignedPartitions)
{
// Build a new list of Topic/Partition/Offset values with offset = Beginning
var newPartitionOffsets = new List<TopicPartitionOffset>();
assignedPartitions.ForEach(x => newPartitionOffsets.Add(new TopicPartitionOffset(x, Offset.Beginning)));
// Assign to consumer
consumer.Assign(newPartitionOffsets);
}
Not sure if there is a more clever way but this seems to do the trick. Of course it only works if this event is only triggered immediately after a subscription is started and not during its lifetime. But even that can probably be managed.
Upvotes: 0
Reputation: 192013
Inside your config, add this after changing your current consumer group value (if you have one)
AutoOffsetReset = AutoOffsetResetType.Earliest
Refer the README
Upvotes: 1