user2247651
user2247651

Reputation: 145

Kafka consumer is not consuming message

I am new in Kafka. kafka consumer is not reading message from the given topic. I am checking with kafka console as well. it is not working. i donot understand the problem. it was working fine earlier.

public string MessageConsumer(string brokerList, List<string> topics, CancellationToken cancellationToken)
    {

        //ConfigurationManager.AutoLoadAppSettings("", "", true);
        string logKey = string.Format("ARIConsumer.StartPRoducer ==>Topics {0} Key{1} =>", "", string.Join(",", topics));

        string message = string.Empty;
        var conf = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "23",
            EnableAutoCommit = false,                
            AutoOffsetReset = AutoOffsetResetType.Latest,
        };

        using (var c = new Consumer<Ignore, string>(conf))
        {
            try
            {
                c.Subscribe(topics);
                bool consuming = true;
                // The client will automatically recover from non-fatal errors. You typically
                // don't need to take any action unless an error is marked as fatal.
                c.OnError += (_, e) => consuming = !e.IsFatal;
                while (consuming)
                {
                    try
                    {
                        TimeSpan timeSpan = new TimeSpan(0, 0, 5);

                        var cr = c.Consume(timeSpan);
                        // Thread.Sleep(5000);
                        if (cr != null)
                        {
                            message = cr.Value;
                            Console.WriteLine("Thread" + Thread.CurrentThread.ManagedThreadId + "Message : " + message);

                            CLogger.WriteLog(ELogLevel.INFO, $"Consumed message Partition '{cr.Partition}' at: '{cr.TopicPartitionOffset} thread: { Thread.CurrentThread.ManagedThreadId}'. Message: {message}");
                            //Console.WriteLine($"Consumed message Partition '{cr.Partition}' at: '{cr.TopicPartitionOffset}'. Topic: { cr.Topic} value :{cr.Value} Timestamp :{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture)} GrpId: { conf.GroupId}");
                            c.Commit();
                        }
                        Console.WriteLine($"Calling the next Poll ");
                    }

                    catch (ConsumeException e)
                    {
                        CLogger.WriteLog(ELogLevel.ERROR, $"Error occured: {e.Error.Reason}");

                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                    //consuming = false;
                }
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
            catch (Exception ex)
            {

            }
        }

        return message;
    }

What is the issue with this code or there is installation issue with kafka

Upvotes: 0

Views: 2011

Answers (2)

M.Hamza Ali
M.Hamza Ali

Reputation: 103

Try removing the timespan from consume method.

Upvotes: 0

OneCricketeer
OneCricketeer

Reputation: 191854

Is there a Producer actively sending data?

Your consumer is starting from the latest offsets based on the AutoOffsetReset, so it wouldn't read existing data in the topic

The console consumer also defaults to the latest offset

And if you haven't changed the GroupId, then your consumer might have worked once, then you consumed data, then commited the offsets for that group. When the consumer starts again in the same group, it will only resume from the end of the topic, or the offset of the last commit

You also have an empty catch (Exception ex), which might be hiding some other error

Upvotes: 1

Related Questions