Reputation: 63
This is my consumer
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace commons.Kafka;
public abstract class KafkaConsumer<T> : BackgroundService where T : class
{
private readonly IConsumer<string, string> _consumer;
private readonly string _groupId;
private readonly ILogger<KafkaConsumer<T>> _logger;
protected KafkaConsumer(ILogger<KafkaConsumer<T>> logger, IConfigurationSection config, params string[] topics)
{
_logger = logger;
_groupId = GetType().Name.ToKebabCase();
_consumer = new ConsumerBuilder<string, string>(new ConsumerConfig
{
BootstrapServers = config["Kafka:BootstrapServers"],
GroupId = _groupId,
AutoOffsetReset = AutoOffsetReset.Earliest
}).Build();
_consumer.Subscribe(topics);
}
protected abstract Task ProcessMessageAsync(Headers headers, string key, T value);
protected virtual async Task ConsumeAsync(CancellationToken stoppingToken)
{
try
{
var consumeResult = _consumer.Consume(stoppingToken);
if (consumeResult != null)
{
_logger.LogInformation("{Consumer}: consumed {Key}", _groupId, consumeResult.Message.Key);
if (Type.GetTypeCode(typeof(T)) != TypeCode.Object)
{
await ProcessMessageAsync(consumeResult.Message.Headers, consumeResult.Message.Key,
consumeResult.Message.Value as T);
}
else
{
var value = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);
await ProcessMessageAsync(consumeResult.Message.Headers, consumeResult.Message.Key, value);
}
}
}
catch (OperationCanceledException)
{
// Ignore cancellation exception
}
catch (Exception ex)
{
_logger.LogError(ex, "{Consumer}: Error consuming message", _groupId);
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("{Consumer}: starting", _groupId);
stoppingToken.Register(() =>
{
_logger.LogInformation("{Consumer}: stopping", _groupId);
_consumer.Close();
});
// create a new process to consume messages
while (!stoppingToken.IsCancellationRequested)
await Task.Run(() => ConsumeAsync(stoppingToken), stoppingToken);
_consumer.Close();
}
}
after recreating the topic, the consumer seems like frozen what happened?
the log here
[16:17:35 INF] updated-consumer: starting
[16:17:35 INF] Now listening on: http://localhost:46000
[16:17:35 INF] Application started. Press Ctrl+C to shut down.
[16:17:35 INF] Hosting environment: Development
[16:17:35 INF] Content root path: D:\PYXIS.API.CHANGELOG\src\PYXIS.API.CHANGELOG
[16:18:05 INF] update-consumer: consumed 1
[16:18:05 INF] update-consumer: consumed 2
[16:18:06 INF] update-consumer: consumed 3
[16:18:20 INF] update-consumer: consumed 4
[16:18:22 INF] update-consumer: consumed 5
[16:18:24 INF] update-consumer: consumed 6
%5|1733390315.292|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic updated_record partition count changed from 1 to 0
%3|1733390315.292|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: updated_record [0]: desired partition is no longer available (Local: Unknown partition)
%3|1733390315.292|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: updated_record [0]: topic does not exist (Broker: Unknown topic or partition)
%3|1733390315.526|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: updated_record [0]: topic does not exist (Broker: Unknown topic or partition)
%3|1733390316.528|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: updated_record [0]: topic does not exist (Broker: Unknown topic or partition)
Upvotes: 0
Views: 42