Reputation: 2292
I'm building a .NET service that connects to an EMQX broker using MQTTnet. While the client connects to EMQX, when I test sending messages through MQTTX (MQTT client tool), my service never receives them. The subscription doesn't seem to be getting subscribed, despite using the SubscribeAsync
Program.cs (Service Setup)
var builder = Host.CreateApplicationBuilder(args);
// Configure services
// Other configuration...
var host = builder.Build();
await host.RunAsync();
public class MQTTClient
private readonly ILogger<MQTTClient> _logger;
private readonly IConfiguration _configuration;
private readonly IManagedMqttClient _mqttClient;
public MQTTClient(ILogger<MQTTClient> logger, IConfiguration configuration)
_logger = logger;
_configuration = configuration;
_mqttClient = new MqttFactory().CreateManagedMqttClient();
_mqttClient.ConnectedAsync += async (MqttClientConnectedEventArgs eventArgs) =>
_logger.LogInformation("Connected to MQTT server");
_mqttClient.DisconnectedAsync += async (MqttClientDisconnectedEventArgs eventArgs) =>
_logger.LogInformation("Disconnected from MQTT server");
_mqttClient.ApplicationMessageReceivedAsync += HandleMessageReceived;
public async Task StartAsync(CancellationToken cancellationToken)
string connectionstring = _configuration["MQTT:Connectionstring"];
string username = _configuration["MQTT:Username"];
string password = _configuration["MQTT:Password"];
string mqttClientId = $"FetchImportService-{Guid.NewGuid()}";
bool ws = connectionstring.StartsWith("ws");
var options = new MqttClientOptionsBuilder()
.WithCredentials(username, password)
if (ws)
var builtOptions = options.Build();
var mqttOptions = new ManagedMqttClientOptionsBuilder()
await _mqttClient.StartAsync(mqttOptions);
public async Task SubscribeAsync()
await _mqttClient.SubscribeAsync(_configuration["MQTT:Addtopic"]);
await _mqttClient.SubscribeAsync(_configuration["MQTT:Bulkaddtopic"]);
private async Task HandleMessageReceived(MqttApplicationMessageReceivedEventArgs eventArgs)
var topic = eventArgs.ApplicationMessage.Topic;
_logger.LogInformation("Message received: {topic}", topic);
if (topic.EndsWith(_configuration["MQTT:Addtopic"]))
_logger.LogInformation("Processing single client add request");
var data = JsonSerializer.Deserialize<RequestData>(eventArgs.ApplicationMessage.Payload);
// Process message...
catch (Exception ex)
_logger.LogError(ex, "Error processing message");
public class ServiceWorker : BackgroundService
private readonly ILogger<ServiceWorker> _logger;
private readonly MQTTClient _mqttClient;
public ServiceWorker(ILogger<ServiceWorker> logger, MQTTClient mqttClient)
_logger = logger;
_mqttClient = mqttClient;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
_logger.LogInformation("ServiceWorker running at: {time}", DateTimeOffset.Now);
await _mqttClient.StartAsync(stoppingToken);
await _mqttClient.SubscribeAsync();
while (!stoppingToken.IsCancellationRequested)
await Task.Delay(1000, stoppingToken);
await _mqttClient.StopAsync(stoppingToken);
Using what @brits provided I was able to get some logs output. Here's what those look like.
[15:18:18 INF] Connection with server established.
[15:18:18 INF] Start receiving packets.
[15:18:18 INF] TX (81 bytes) >>> Connect: [ClientId=FetchImportService-488b512d-0b99-4073-9d59-93675cb7b682] [Username=test] [Password=****] [KeepAlivePeriod=15] [CleanSession=True]
[15:18:18 INF] RX (4 bytes) <<< ConnAck: [ReturnCode=ConnectionAccepted] [ReasonCode=Success] [IsSessionPresent=False]
[15:18:18 INF] Authenticated MQTT connection with server established.
[15:18:18 INF] Connected.
[15:18:18 INF] Connected to MQTT server
[15:18:18 INF] Start sending keep alive packets.
[15:18:18 INF] Publishing subscriptions at reconnect
[15:18:18 INF] Publishing 2 added and 0 removed subscriptions
[15:18:18 INF] TX (74 bytes) >>> Subscribe: [PacketIdentifier=1] [TopicFilters=test/lookupservice/clients/add@AtLeastOnce,test/lookupservice/clients/bulkadd@AtLeastOnce]
[15:18:18 INF] RX (6 bytes) <<< SubAck: [PacketIdentifier=1] [ReasonCode=UnspecifiedError,UnspecifiedError]
method is never triggered when messages are sent via MQTTXWhy isn't my service receiving messages that I publish through MQTTX? The client connects successfully, but messages sent through MQTTX never trigger the HandleMessageReceived
method. What could be preventing the message reception?
Upvotes: 0
Views: 34
Reputation: 2292
In your ACL file, which you can access via https://<broker url>/#/authorization/detail/file
make sure your IP isn't blacklisted and/or you're using an unaccepted string for your Client ID.
Upvotes: 0