Niranjan
Niranjan

Reputation: 2229

How to handle multiple records in Kafka Consumer?

Hi I am working on Confluent Kafka Consumer. I have multiple records in my broker. I want to handle all the records now. Below is my implementation of consumer.

public ConsumeResult<string, GenericRecord> Consume(string topic)
    {
      ConsumeResult<string, GenericRecord> result;
      try
      {
        result = consumer.Consume();
        Commit(result);
        return result;
      }
      catch (Exception e)
      {
        this.logger.Error("KafkaClient", $"Error sending message '{e.Message}'");
        return null;
      }
    }

Result retuurned from consumer If there are multiple records inside Broker, Then one event/message I will get a time using GenericRecord. If there are multiple records Then how to handle consumer effectively? Any help would be appreciated. Thanks

Upvotes: 0

Views: 2268

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191854

You would just loop. See the examples

https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/AvroGeneric/Program.cs

while (true)
{
    try
    {
        var consumeResult = consumer.Consume(cts.Token);

        Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
    }
    catch (ConsumeException e)
    {
        Console.WriteLine($"Consume error: {e.Error.Reason}");
    }
}

Upvotes: 1

Related Questions