Shinchoku
Shinchoku

Reputation: 63

.NET, KAFKA: Why my consumer not consume when i recreate topic or if consumer start if topics not created first

This is my consumer

using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace commons.Kafka;

public abstract class KafkaConsumer<T> : BackgroundService where T : class
{
    private readonly IConsumer<string, string> _consumer;
    private readonly string _groupId;
    private readonly ILogger<KafkaConsumer<T>> _logger;

    protected KafkaConsumer(ILogger<KafkaConsumer<T>> logger, IConfigurationSection config, params string[] topics)
    {
        _logger = logger;
        _groupId = GetType().Name.ToKebabCase();
        _consumer = new ConsumerBuilder<string, string>(new ConsumerConfig
        {
            BootstrapServers = config["Kafka:BootstrapServers"],
            GroupId = _groupId,
            AutoOffsetReset = AutoOffsetReset.Earliest
        }).Build();

        _consumer.Subscribe(topics);
    }

    protected abstract Task ProcessMessageAsync(Headers headers, string key, T value);

    protected virtual async Task ConsumeAsync(CancellationToken stoppingToken)
    {
        try
        {
            var consumeResult = _consumer.Consume(stoppingToken);
            if (consumeResult != null)
            {
                _logger.LogInformation("{Consumer}: consumed {Key}", _groupId, consumeResult.Message.Key);
                if (Type.GetTypeCode(typeof(T)) != TypeCode.Object)
                {
                    await ProcessMessageAsync(consumeResult.Message.Headers, consumeResult.Message.Key,
                        consumeResult.Message.Value as T);
                }
                else
                {
                    var value = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);
                    await ProcessMessageAsync(consumeResult.Message.Headers, consumeResult.Message.Key, value);
                }
            }
        }
        catch (OperationCanceledException)
        {
            // Ignore cancellation exception
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "{Consumer}: Error consuming message", _groupId);
        }
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("{Consumer}: starting", _groupId);
        stoppingToken.Register(() =>
        {
            _logger.LogInformation("{Consumer}: stopping", _groupId);
            _consumer.Close();
        });

        // create a new process to consume messages
        while (!stoppingToken.IsCancellationRequested)
            await Task.Run(() => ConsumeAsync(stoppingToken), stoppingToken);
        _consumer.Close();
    }
}

after recreating the topic, the consumer seems like frozen what happened?

the log here

[16:17:35 INF] updated-consumer: starting

[16:17:35 INF] Now listening on: http://localhost:46000

[16:17:35 INF] Application started. Press Ctrl+C to shut down.

[16:17:35 INF] Hosting environment: Development

[16:17:35 INF] Content root path: D:\PYXIS.API.CHANGELOG\src\PYXIS.API.CHANGELOG

[16:18:05 INF] update-consumer: consumed 1

[16:18:05 INF] update-consumer: consumed 2

[16:18:06 INF] update-consumer: consumed 3

[16:18:20 INF] update-consumer: consumed 4

[16:18:22 INF] update-consumer: consumed 5

[16:18:24 INF] update-consumer: consumed 6

%5|1733390315.292|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic updated_record partition count changed from 1 to 0

%3|1733390315.292|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: updated_record [0]: desired partition is no longer available (Local: Unknown partition)

%3|1733390315.292|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: updated_record [0]: topic does not exist (Broker: Unknown topic or partition)

%3|1733390315.526|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: updated_record [0]: topic does not exist (Broker: Unknown topic or partition)

%3|1733390316.528|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: updated_record [0]: topic does not exist (Broker: Unknown topic or partition)

Upvotes: 0

Views: 42

Answers (0)

Related Questions