AbdulQASDET
AbdulQASDET

Reputation: 63

How to connect to kafka port and do integration testing by reading topic and message

I am trying to connect to Kafka port exposed by me in apphost in integrationtest.cs file after connecting to port, go and read all kafka topics and open a specific topic and read the messages in it

Apphost.cs

var builder = DistributedApplication.CreateBuilder(args);
"
"
"
"
var kafka = builder.AddKafka("kafka", port: 9200)
    .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100))
    .WithLifetime(ContainerLifetime.Session);

builder.AddProject<Projects.O_WP_PIT_KafkaPublisher_Service>("kafkaPublisher", "Aspire")
    .WithReference(apiService)
    .WaitFor(apiService)
    .WithReference(kafka)
    .WaitFor(kafka)
    .WaitForCompletion(dacpac);

builder.Build().Run();

**IntegrationTest.cs **

_appHost = await DistributedApplicationTestingBuilder.CreateAsync<Projects.PI_AspireSolution_AppHost>();
_appHost.Services.ConfigureHttpClientDefaults(clientBuilder =>
{
    clientBuilder.AddStandardResilienceHandler();
});
_app = await _appHost.BuildAsync();
await _app.StartAsync();
        await _resourceNotificationService.WaitForResourceAsync("kafka", KnownResourceStates.Running).WaitAsync(TimeSpan.FromSeconds(120));

after this im not sure how to connect to kafka and read all the topics and messages in each topic

Upvotes: 0

Views: 36

Answers (3)

Srikanth Kotnala
Srikanth Kotnala

Reputation: 193

Configure Kafka broker and consumer group and then try to get metadata information using consumer.GetMetadata. See below

using Confluent.Kafka;
using System;
using System.Collections.Generic;

class Program
{
static void Main(string[] args)
{
    var config = new ConsumerConfig
    {
        GroupId = "your-consumer-group",
        BootstrapServers = "localhost:9092",
        AutoOffsetReset = AutoOffsetReset.Earliest
    };

    using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
    {
        // Subscribe to all topics
        var topics = GetAllTopics(consumer);
        consumer.Subscribe(topics);

        Console.WriteLine("Listening to topics...");

        try
        {
            while (true)
            {
                var cr = consumer.Consume();
                Console.WriteLine($"Consumed message '{cr.Value}' from topic '{cr.Topic}'");
            }
        }
        catch (OperationCanceledException)
        {
            consumer.Close();
        }
    }
}

static List<string> GetAllTopics(IConsumer<Ignore, string> consumer)
{
    var metadata = consumer.GetMetadata(TimeSpan.FromSeconds(10));
    var topics = new List<string>();
    foreach (var topic in metadata.Topics)
    {
        topics.Add(topic.Topic);
    }
    return topics;
}
}

Upvotes: 0

AbdulQASDET
AbdulQASDET

Reputation: 63

Thanks for the answer and it worked like 95% success

I am facing a slight issue

when integration test is running, only 1st topic is shown which is healthchecks-topic but not 2nd topic dnd-pit-dev-act-outage-topic

it is taking some more time to show 2nd topic which eventually is failing test

any way can we ask kafka to wait until expected topic is shown ?

enter image description here

Upvotes: 0

Mohammad Aghazadeh
Mohammad Aghazadeh

Reputation: 2960

first of all Make sure you have the Confluent.Kafka and XUnit NuGet packages installed in your test project:

dotnet add package Confluent.Kafka
dotnet add package XUnit 

Once you ensure Kafka is running (WaitForResourceAsync completes successfully), you need Get List of Topics as follows :

public async Task<List<string>> GetAllTopicsAsync(string bootstrapServers)
{
    using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();
    var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
    return metadata.Topics.Select(t => t.Topic).ToList();
}

Once you have the topic list, you can create a Kafka Consumer to read messages.

public async Task ReadMessagesFromTopicAsync(string bootstrapServers, string topicName)
{
    var config = new ConsumerConfig
    {
        BootstrapServers = bootstrapServers,
        GroupId = "test-consumer-group",
        AutoOffsetReset = AutoOffsetReset.Earliest, // Read from the beginning
        EnableAutoCommit = false // Don't auto-commit offsets
    };

    using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
    consumer.Subscribe(topicName);

    try
    {
        for (int i = 0; i < 5; i++) // Read 5 messages for testing
        {
            var consumeResult = consumer.Consume(TimeSpan.FromSeconds(10));
            if (consumeResult != null)
            {
                Console.WriteLine($"Received message: {consumeResult.Message.Value}");
            }
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Error while consuming: {ex.Message}");
    }
    finally
    {
        consumer.Close();
    }
}

The complete code of IntegrationTest.cs will be as follows :

using Xunit;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;

public class KafkaIntegrationTest
{
    private readonly string _kafkaBootstrapServers = "localhost:9200"; // Change if needed

    [Theory]
    [InlineData("my-topic")]   //Replace this with your desired topic
    [InlineData("another-topic")] //Replace this with your desired topic  
    public async Task Kafka_Should_Contain_Topic_And_Read_Messages(string topicName)
    {
        // Wait for Kafka to be up
        await _resourceNotificationService.WaitForResourceAsync("kafka", KnownResourceStates.Running)
                                          .WaitAsync(TimeSpan.FromSeconds(120));

        // Get all topics
        var topics = await GetAllTopicsAsync(_kafkaBootstrapServers);
        
        // Assert topic exists
        Assert.Contains(topicName, topics);

        // Read messages from the topic
        var messages = await ReadMessagesFromTopicAsync(_kafkaBootstrapServers, topicName);

        // Assert messages are not empty
        Assert.NotEmpty(messages);

        // (Optional) Assert specific expected message content
        Assert.Contains(messages, msg => msg.Contains("expected-message-content"));
    }

    private async Task<List<string>> GetAllTopicsAsync(string bootstrapServers)
    {
        using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();
        var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
        return metadata.Topics.Select(t => t.Topic).ToList();
    }

    private async Task<List<string>> ReadMessagesFromTopicAsync(string bootstrapServers, string topicName)
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = bootstrapServers,
            GroupId = "test-consumer-group",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            EnableAutoCommit = false
        };

        var messages = new List<string>();

        using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
        consumer.Subscribe(topicName);

        try
        {
            for (int i = 0; i < 5; i++) // Read up to 5 messages
            {
                var consumeResult = consumer.Consume(TimeSpan.FromSeconds(5));
                if (consumeResult != null)
                {
                    messages.Add(consumeResult.Message.Value);
                }
            }
        }
        finally
        {
            consumer.Close();
        }

        return messages;
    }
}

Upvotes: 1

Related Questions