Lucas Gazire
Lucas Gazire

Reputation: 311

Configure kafka-net to stop sending latest messages

I'm using kafka 0.8.1.1 on a Red Hat VM with kafka-net plugin. How can I configure my consumer to stop receiving earlier messages from kafka?

My consumer code:

var options = new KafkaOptions(new Uri("tcp://199.53.249.150:9092"), new Uri("tcp://199.53.249.151:9092"));

Stopwatch sp = new Stopwatch();
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions("Test", router));

ThreadStart start2 = () =>
{
    while (true)
    {
        sp.Start();
        foreach (var message in consumer.Consume())
        {
            if (MessageDecoderReceiver.MessageBase(message.Value) != null)
            {
                PrintMessage(MessageDecoderReceiver.MessageBase(message.Value).ToString());
            }
            else
            {
                Console.WriteLine(message.Value);
            }
        }
        sp.Stop();
    }
};
var thread2 = new Thread(start2);
thread2.Start();

Upvotes: 10

Views: 4409

Answers (1)

James Roland
James Roland

Reputation: 8262

The Consumer in Kafka-net does not currently auto track the offsets being consumed. You will have to implement the offset tracking manually.

To Store the offset in kafka version 0.8.1:

 var commit = new OffsetCommitRequest
            {
                ConsumerGroup = consumerGroup,
                OffsetCommits = new List<OffsetCommit>
                            {
                                new OffsetCommit
                                    {
                                        PartitionId = partitionId,
                                        Topic = IntegrationConfig.IntegrationTopic,
                                        Offset = offset,
                                        Metadata = metadata
                                    }
                            }
            };

var commitResponse = conn.Connection.SendAsync(commit).Result.FirstOrDefault();

To set the consumer to start importing at a specific offset point:

var offsets = consumer.GetTopicOffsetAsync(IntegrationConfig.IntegrationTopic).Result
                    .Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray();

var consumer = new Consumer(new ConsumerOptions(IntegrationConfig.IntegrationTopic, router), offsets);

Note the above code will set the consumer to start consuming at the very end of the log, effectively only receiving new messages.

Upvotes: 13

Related Questions