Reputation: 63
I am trying to connect to Kafka port exposed by me in apphost in integrationtest.cs file after connecting to port, go and read all kafka topics and open a specific topic and read the messages in it
Apphost.cs
var builder = DistributedApplication.CreateBuilder(args);
"
"
"
"
var kafka = builder.AddKafka("kafka", port: 9200)
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100))
.WithLifetime(ContainerLifetime.Session);
builder.AddProject<Projects.O_WP_PIT_KafkaPublisher_Service>("kafkaPublisher", "Aspire")
.WithReference(apiService)
.WaitFor(apiService)
.WithReference(kafka)
.WaitFor(kafka)
.WaitForCompletion(dacpac);
builder.Build().Run();
**IntegrationTest.cs **
_appHost = await DistributedApplicationTestingBuilder.CreateAsync<Projects.PI_AspireSolution_AppHost>();
_appHost.Services.ConfigureHttpClientDefaults(clientBuilder =>
{
clientBuilder.AddStandardResilienceHandler();
});
_app = await _appHost.BuildAsync();
await _app.StartAsync();
await _resourceNotificationService.WaitForResourceAsync("kafka", KnownResourceStates.Running).WaitAsync(TimeSpan.FromSeconds(120));
after this im not sure how to connect to kafka and read all the topics and messages in each topic
Upvotes: 0
Views: 36
Reputation: 193
Configure Kafka broker and consumer group and then try to get metadata information using consumer.GetMetadata. See below
using Confluent.Kafka;
using System;
using System.Collections.Generic;
class Program
{
static void Main(string[] args)
{
var config = new ConsumerConfig
{
GroupId = "your-consumer-group",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
// Subscribe to all topics
var topics = GetAllTopics(consumer);
consumer.Subscribe(topics);
Console.WriteLine("Listening to topics...");
try
{
while (true)
{
var cr = consumer.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' from topic '{cr.Topic}'");
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}
}
static List<string> GetAllTopics(IConsumer<Ignore, string> consumer)
{
var metadata = consumer.GetMetadata(TimeSpan.FromSeconds(10));
var topics = new List<string>();
foreach (var topic in metadata.Topics)
{
topics.Add(topic.Topic);
}
return topics;
}
}
Upvotes: 0
Reputation: 63
Thanks for the answer and it worked like 95% success
I am facing a slight issue
when integration test is running, only 1st topic is shown which is healthchecks-topic but not 2nd topic dnd-pit-dev-act-outage-topic
it is taking some more time to show 2nd topic which eventually is failing test
any way can we ask kafka to wait until expected topic is shown ?
Upvotes: 0
Reputation: 2960
first of all Make sure you have the Confluent.Kafka
and XUnit
NuGet packages installed in your test project:
dotnet add package Confluent.Kafka
dotnet add package XUnit
Once you ensure Kafka is running (WaitForResourceAsync
completes successfully), you need Get List of Topics as follows :
public async Task<List<string>> GetAllTopicsAsync(string bootstrapServers)
{
using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();
var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
return metadata.Topics.Select(t => t.Topic).ToList();
}
Once you have the topic list, you can create a Kafka Consumer to read messages.
public async Task ReadMessagesFromTopicAsync(string bootstrapServers, string topicName)
{
var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = "test-consumer-group",
AutoOffsetReset = AutoOffsetReset.Earliest, // Read from the beginning
EnableAutoCommit = false // Don't auto-commit offsets
};
using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
consumer.Subscribe(topicName);
try
{
for (int i = 0; i < 5; i++) // Read 5 messages for testing
{
var consumeResult = consumer.Consume(TimeSpan.FromSeconds(10));
if (consumeResult != null)
{
Console.WriteLine($"Received message: {consumeResult.Message.Value}");
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Error while consuming: {ex.Message}");
}
finally
{
consumer.Close();
}
}
The complete code of IntegrationTest.cs
will be as follows :
using Xunit;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
public class KafkaIntegrationTest
{
private readonly string _kafkaBootstrapServers = "localhost:9200"; // Change if needed
[Theory]
[InlineData("my-topic")] //Replace this with your desired topic
[InlineData("another-topic")] //Replace this with your desired topic
public async Task Kafka_Should_Contain_Topic_And_Read_Messages(string topicName)
{
// Wait for Kafka to be up
await _resourceNotificationService.WaitForResourceAsync("kafka", KnownResourceStates.Running)
.WaitAsync(TimeSpan.FromSeconds(120));
// Get all topics
var topics = await GetAllTopicsAsync(_kafkaBootstrapServers);
// Assert topic exists
Assert.Contains(topicName, topics);
// Read messages from the topic
var messages = await ReadMessagesFromTopicAsync(_kafkaBootstrapServers, topicName);
// Assert messages are not empty
Assert.NotEmpty(messages);
// (Optional) Assert specific expected message content
Assert.Contains(messages, msg => msg.Contains("expected-message-content"));
}
private async Task<List<string>> GetAllTopicsAsync(string bootstrapServers)
{
using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();
var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
return metadata.Topics.Select(t => t.Topic).ToList();
}
private async Task<List<string>> ReadMessagesFromTopicAsync(string bootstrapServers, string topicName)
{
var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = "test-consumer-group",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
var messages = new List<string>();
using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
consumer.Subscribe(topicName);
try
{
for (int i = 0; i < 5; i++) // Read up to 5 messages
{
var consumeResult = consumer.Consume(TimeSpan.FromSeconds(5));
if (consumeResult != null)
{
messages.Add(consumeResult.Message.Value);
}
}
}
finally
{
consumer.Close();
}
return messages;
}
}
Upvotes: 1