Reputation: 9
I am trying to create a Kafka Consumer that consumes data from a topic, using MassTransit. As part of this, I am creating integration tests for it using Testcontainers, in particular a Kafka Testcontainer.
The problem I am facing is that the consumer fails to connect to the Kafka Container topic, and so it's not receiving any messages.
I've tried reconfiguring ports of the Kafka container with every configuration under the sun and nothing seems to be working
MassTransit setup:
builder.ConfigureServices((hostContext, services) =>
{
var topicName = configuration["Kafka:TopicName"];
var consumerGroupName = configuration["Kafka:ConsumerGroupName"];
services.AddLogging(c => c.AddConsole());
services.AddSingleton(configuration);
services.AddMediatR(cfg => cfg.RegisterServicesFromAssemblyContaining<Program>());
services.AddMassTransit(x =>
{
x.UsingInMemory((context, config) => config.ConfigureEndpoints(context));
x.AddRider(rider =>
{
rider.AddConsumer<LocationsMessageConsumer>();
rider.UsingKafka((context, k) =>
{
k.Host(configuration["Kafka:Endpoint"]);
k.TopicEndpoint<Null, string>(topicName, consumerGroupName, e =>
{
e.ConfigureConsumer<LocationsMessageConsumer>(context);
});
});
});
});
Basic Consumer:
public class KafkaMessageConsumer : IConsumer<string>
{
public virtual Task Consume(ConsumeContext<string> context)
{
return Task.CompletedTask;
}
}
Kafka Container Setup:
public async Task<KafkaContainer> SetupKafkaContainer(INetwork network)
{
KafkaContainer kafkaContainer = new KafkaBuilder()
.WithImage("confluentinc/cp-kafka:latest")
.WithNetwork(network)
.WithName(_kafkaContainerName)
.WithPortBinding(_kafkaPort, 9092)
.WithEnvironment("ALLOW_PLAINTEXT_LISTENER", "yes")
.WithEnvironment("KAFKA_CFG_LISTENERS", $"PLAINTEXT://:9092")
.WithEnvironment("KAFKA_CFG_ADVERTISED_LISTENERS", $"PLAINTEXT://localhost:9092")
.Build();
await kafkaContainer.StartAsync();
return kafkaContainer;
}
Error seen in consumer container:
warn: MassTransit[0] Consumer [] error (Local_Transport): localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT) on locations warn: MassTransit[0] Consumer [] error (Local_AllBrokersDown): 1/1 brokers are down on locations
I have tried using a non-MassTransit sender and receiver to confirm that messages can be sent through the topic without issue.
[2023-09-22 14:49:19,695] DEBUG [PartitionStateMachine controllerId=1] Started partition state machine with initial state -> HashMap() (kafka.controller.ZkPartitionStateMachine) [2023-09-22 14:49:19,695] INFO [Controller id=1] Ready to serve as the new controller with epoch 1 (kafka.controller.KafkaController) [2023-09-22 14:49:19,695] WARN [RequestSendThread controllerId=1] Controller 1's connection to broker 172.31.0.2:9093 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread) java.io.IOException: Connection to 172.31.0.2:9093 (id: 1 rack: null) failed. at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70) at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127) [2023-09-22 14:49:19,698] INFO [Controller id=1, targetBrokerId=1] Client requested connection close from node 1 (org.apache.kafka.clients.NetworkClient)
Upvotes: 0
Views: 952
Reputation: 33457
Well, from the error you posted, it's trying to connect to 127.0.0.1
which is the default if you don't configure a host. It seems that your:
k.Host(configuration["Kafka:Endpoint"]);
Isn't returning what you expect, it should be of type string[]
and contain the list of host names/ports where Kafka is reachable. Such as:
new []{ "host-a:9092", "host-b:9092" }
Upvotes: 0
Reputation: 191884
Can you get container logs? I suspect it's not actually starting at all.
Confluent images don't use KAFKA_CFG prefix, only bitnami/kafka container does. Therefore, the configs you've set aren't accepted, causing the server to be misconfigured
You'll also need KAFKA_ZOOKEEPER_CONNECT, for example
Upvotes: 0