tisaconundrum
tisaconundrum

Reputation: 2292

MQTT Client (MQTTnet) Not Successfully Subscribing to Topics Despite Connection to EMQX Broker

I'm building a .NET service that connects to an EMQX broker using MQTTnet. While the client connects to EMQX, when I test sending messages through MQTTX (MQTT client tool), my service never receives them. The subscription doesn't seem to be getting subscribed, despite using the SubscribeAsync

Code Structure

Program.cs (Service Setup)

var builder = Host.CreateApplicationBuilder(args);

// Configure services
builder.Services.AddSingleton<MQTTClient>();
builder.Services.AddHostedService<ServiceWorker>();

// Other configuration...

var host = builder.Build();
await host.RunAsync();

MQTTClient.cs

public class MQTTClient
{
    private readonly ILogger<MQTTClient> _logger;
    private readonly IConfiguration _configuration;
    private readonly IManagedMqttClient _mqttClient;

    public MQTTClient(ILogger<MQTTClient> logger, IConfiguration configuration)
    {
        _logger = logger;
        _configuration = configuration;

        _mqttClient = new MqttFactory().CreateManagedMqttClient();
        _mqttClient.ConnectedAsync += async (MqttClientConnectedEventArgs eventArgs) => 
        { 
            _logger.LogInformation("Connected to MQTT server"); 
        };
        _mqttClient.DisconnectedAsync += async (MqttClientDisconnectedEventArgs eventArgs) => 
        { 
            _logger.LogInformation("Disconnected from MQTT server"); 
        };
        _mqttClient.ApplicationMessageReceivedAsync += HandleMessageReceived;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        string connectionstring = _configuration["MQTT:Connectionstring"];
        string username = _configuration["MQTT:Username"];
        string password = _configuration["MQTT:Password"];
        string mqttClientId = $"FetchImportService-{Guid.NewGuid()}";

        bool ws = connectionstring.StartsWith("ws");

        var options = new MqttClientOptionsBuilder()
            .WithClientId(mqttClientId)
            .WithCredentials(username, password)
            .WithTls()
            .WithCleanSession();

        if (ws)
            options.WithWebSocketServer(connectionstring);
        else
            options.WithTcpServer(connectionstring);

        var builtOptions = options.Build();
        var mqttOptions = new ManagedMqttClientOptionsBuilder()
            .WithClientOptions(builtOptions)
            .WithAutoReconnectDelay(TimeSpan.FromSeconds(3))
            .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage)
            .Build();

        await _mqttClient.StartAsync(mqttOptions);
    }

    public async Task SubscribeAsync()
    {
        await _mqttClient.SubscribeAsync(_configuration["MQTT:Addtopic"]);
        await _mqttClient.SubscribeAsync(_configuration["MQTT:Bulkaddtopic"]);
    }

    private async Task HandleMessageReceived(MqttApplicationMessageReceivedEventArgs eventArgs)
    {
        var topic = eventArgs.ApplicationMessage.Topic;
        _logger.LogInformation("Message received: {topic}", topic);

        try
        {
            if (topic.EndsWith(_configuration["MQTT:Addtopic"]))
            {
                _logger.LogInformation("Processing single client add request");
                var data = JsonSerializer.Deserialize<RequestData>(eventArgs.ApplicationMessage.Payload);
                // Process message...
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error processing message");
        }
    }
}

ServiceWorker.cs

public class ServiceWorker : BackgroundService
{
    private readonly ILogger<ServiceWorker> _logger;
    private readonly MQTTClient _mqttClient;

    public ServiceWorker(ILogger<ServiceWorker> logger, MQTTClient mqttClient)
    {
        _logger = logger;
        _mqttClient = mqttClient;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("ServiceWorker running at: {time}", DateTimeOffset.Now);

        await _mqttClient.StartAsync(stoppingToken);
        await _mqttClient.SubscribeAsync();
                
        while (!stoppingToken.IsCancellationRequested)
        {
            await Task.Delay(1000, stoppingToken);
        }

        await _mqttClient.StopAsync(stoppingToken);
    }
}

Here are the logs

Using what @brits provided I was able to get some logs output. Here's what those look like.


[15:18:18 INF] Connection with server established.
[15:18:18 INF] Start receiving packets.
[15:18:18 INF] TX (81 bytes) >>> Connect: [ClientId=FetchImportService-488b512d-0b99-4073-9d59-93675cb7b682] [Username=test] [Password=****] [KeepAlivePeriod=15] [CleanSession=True]
[15:18:18 INF] RX (4 bytes) <<< ConnAck: [ReturnCode=ConnectionAccepted] [ReasonCode=Success] [IsSessionPresent=False]
[15:18:18 INF] Authenticated MQTT connection with server established.
[15:18:18 INF] Connected.
[15:18:18 INF] Connected to MQTT server
[15:18:18 INF] Start sending keep alive packets.
[15:18:18 INF] Publishing subscriptions at reconnect
[15:18:18 INF] Publishing 2 added and 0 removed subscriptions
[15:18:18 INF] TX (74 bytes) >>> Subscribe: [PacketIdentifier=1] [TopicFilters=test/lookupservice/clients/add@AtLeastOnce,test/lookupservice/clients/bulkadd@AtLeastOnce]
[15:18:18 INF] RX (6 bytes) <<< SubAck: [PacketIdentifier=1] [ReasonCode=UnspecifiedError,UnspecifiedError]

What I've Confirmed:

  1. The service successfully connects to EMQX (confirmed via logs and EMQX dashboard)
  2. The connection remains stable
  3. I can successfully publish test messages using MQTTX to the topics I'm trying to subscribe to
  4. The MQTTX messages are visible in the MQTTX History
  5. My service's HandleMessageReceived method is never triggered when messages are sent via MQTTX

Testing Setup:

  1. Using MQTTX to publish test messages to the topics
  2. Topics in configuration match exactly what I'm publishing to in MQTTX
  3. Messages are successfully published (confirmed in MQTTX which is subscribed to the same topic)
  4. Service is running and connected during testing

Environment:

Question:

Why isn't my service receiving messages that I publish through MQTTX? The client connects successfully, but messages sent through MQTTX never trigger the HandleMessageReceived method. What could be preventing the message reception?

Upvotes: 0

Views: 34

Answers (1)

tisaconundrum
tisaconundrum

Reputation: 2292

Did you try checking your ACL?

In your ACL file, which you can access via https://<broker url>/#/authorization/detail/file make sure your IP isn't blacklisted and/or you're using an unaccepted string for your Client ID.

Upvotes: 0

Related Questions