Reputation: 2229
Hi I am working on Confluent Kafka Consumer. I have multiple records in my broker. I want to handle all the records now. Below is my implementation of consumer.
public ConsumeResult<string, GenericRecord> Consume(string topic)
{
ConsumeResult<string, GenericRecord> result;
try
{
result = consumer.Consume();
Commit(result);
return result;
}
catch (Exception e)
{
this.logger.Error("KafkaClient", $"Error sending message '{e.Message}'");
return null;
}
}
If there are multiple records inside Broker, Then one event/message I will get a time using GenericRecord. If there are multiple records Then how to handle consumer effectively? Any help would be appreciated. Thanks
Upvotes: 0
Views: 2268
Reputation: 191854
You would just loop. See the examples
https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/AvroGeneric/Program.cs
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
Upvotes: 1