Reputation: 1760
I'm struggling with an issue that I cannot understand. With this configuration, I can successfully produce a message;
var config = new ProducerConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = saslUsername,
SaslPassword = saslPassword
};
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
Console.WriteLine("Enter a message to send to the topic (or 'exit' to quit):");
string message;
while ((message = Console.ReadLine()) != "exit")
{
try
{
var result = await producer.ProduceAsync(topic, new Message<Null, string> { Value = message });
Console.WriteLine($"Message '{message}' sent to topic '{topic}' at offset {result.Offset}");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
It works without any issues. I can go to my Azure portal and see every message I write.
Then I do the same with a consumer (another console app like this):
var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = saslUsername,
SaslPassword = saslPassword,
GroupId = consumerGroup,
SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None, // Disable endpoint identification
//SessionTimeoutMs = 6000, // To reduce disconnection frequency
//MaxPollIntervalMs = 10000,
//SocketKeepaliveEnable = true,
Debug = "all", // Enables detailed logs
AutoOffsetReset = AutoOffsetReset.Latest,
EnableAutoCommit = true,
SocketKeepaliveEnable = true
};
...
consumer.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cts.Cancel();
};
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
string logMessage = $"Consumed message '{consumeResult.Message.Value}' at: '{consumeResult.TopicPartitionOffset}'.";
From the moment I do consumer.Consume(cts.Token),
I do not get any message.
I dug into the logs, and the only problems I see (if they are problems) are stuff like these:
[Confluent.Kafka] Debug: [thrd:main]: Topic anothertest [2]: stored offset INVALID (leader epoch -1), committed offset INVALID (leader epoch -1): not including in commit[Confluent.Kafka] Debug: [thrd:main]: anothertest [1]: skipping offset validation for offset 7 (leader epoch -1): no leader epoch set
[Confluent.Kafka] Debug: [thrd:main]: anothertest [1]: skipping offset validation for offset 7 (leader epoch -1): no leader epoch set
[Confluent.Kafka] Debug: [thrd:main]: Topic anothertest [2]: broker is down: re-query
I guess the problem is somewhere in "reading," but I don't understand what, and I tried all the possible combinations to see what's going on with no luck.
Any idea or suggestion as to what to try next?
Upvotes: 1
Views: 54
Reputation: 1760
I found a "workaround" that's working well enough, and I hope it can help others. I tested the Confluent.Kafka library using the Python version, and after comparing the logs, I noticed that the Python version uses client.software.version 2.5.3, while the .NET version uses 2.6.0.
Since both are likely wrappers around the same core library, I switched to the 2.5.3 NuGet package in my .NET project, and everything started working perfectly!
It seems that version 2.6.0 introduced some changes (possibly related to TLS) that are causing communication with Event Hub to fail. I plan to open a bug.
I hope this can help someone else
Upvotes: 2