Seyed Morteza Mousavi
Seyed Morteza Mousavi

Reputation: 6963

Return from Kafka consumer when there is no message

I want to process a topic in application startup using Confluent dotnet client. Assume following example:

    while (true)
    {
        try
        {
            var cr = c.Consume();
            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
        }
        catch (ConsumeException e)
        {
            Console.WriteLine($"Error occured: {e.Error.Reason}");
        }
    }

When there is no new message in Kafka, c.Consume will be blocked. Because I want to use it for application startup (Like cache warm up) I want to proceed my code when I found there is no new message.

I know there is an overload for setting timeout like c.Consume(timeout) but the problem with this approach is that if you have a message in your topic and the time duration of reading the message was more than your timeout, You receive null output which is not desirable.

Upvotes: 4

Views: 8072

Answers (4)

Shishir Kumar Bhoi
Shishir Kumar Bhoi

Reputation: 11

In config enable eof :

var config = new ConsumerConfig { EnablePartitionEof = true };

Check : cr.IsPartitionEOF after consuming.

Upvotes: 1

Pabitro
Pabitro

Reputation: 36

I found the Consumer.IsPartitionEOF useful.

Upvotes: 0

Francois
Francois

Reputation: 3090

The consumer(s) is not supposed to be aware of the producer(s).

Now if you want to know that you have read everything in the topic from the moment you start to consume, you can:

  1. Load the newest offset before starting to consume.
  2. Then start consuming messages.
  3. If the message's offset is the same as the newest offset you loaded before, stop consuming.

I'm not a C# developper but from what I read in the dotnet confluent doc you can call QueryWatermarkOffsetson the consumer to get oldest and newest offset. https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Consumer.html#Confluent_Kafka_Consumer_QueryWatermarkOffsets_Confluent_Kafka_TopicPartition_

And then, on the Messageclass you have an Offset accessor. So the whole thing should not be too hard to achieve. https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Message.html#Confluent_Kafka_Message_Offset

Upvotes: 3

Amin
Amin

Reputation: 1023

You can use OnPartitionEOF event that indicates you have reached the end of partition.

CancellationTokenSource source = new CancellationTokenSource();
bool isContinue = true;

c.OnPartitionEOF += (o, e) =>
    {
        Console.WriteLine($"You have reached end of partition");
        isContinue = false;
        source.Cancel();
    };    
while (isContinue)
{
    try
    {
        var cr = c.Consume(source.Token);
        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
    }
    catch (ConsumeException e)
    {
        Console.WriteLine($"Error occured: {e.Error.Reason}");
    }
}

Upvotes: 3

Related Questions