Reputation: 2292
I'm building a .NET service that connects to an EMQX broker using MQTTnet. While the client connects to EMQX, when I test sending messages through MQTTX (MQTT client tool), my service never receives them. The subscription doesn't seem to be getting subscribed, despite using the SubscribeAsync
Program.cs (Service Setup)
var builder = Host.CreateApplicationBuilder(args);
// Configure services
builder.Services.AddSingleton<MQTTClient>();
builder.Services.AddHostedService<ServiceWorker>();
// Other configuration...
var host = builder.Build();
await host.RunAsync();
MQTTClient.cs
public class MQTTClient
{
private readonly ILogger<MQTTClient> _logger;
private readonly IConfiguration _configuration;
private readonly IManagedMqttClient _mqttClient;
public MQTTClient(ILogger<MQTTClient> logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
_mqttClient = new MqttFactory().CreateManagedMqttClient();
_mqttClient.ConnectedAsync += async (MqttClientConnectedEventArgs eventArgs) =>
{
_logger.LogInformation("Connected to MQTT server");
};
_mqttClient.DisconnectedAsync += async (MqttClientDisconnectedEventArgs eventArgs) =>
{
_logger.LogInformation("Disconnected from MQTT server");
};
_mqttClient.ApplicationMessageReceivedAsync += HandleMessageReceived;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
string connectionstring = _configuration["MQTT:Connectionstring"];
string username = _configuration["MQTT:Username"];
string password = _configuration["MQTT:Password"];
string mqttClientId = $"FetchImportService-{Guid.NewGuid()}";
bool ws = connectionstring.StartsWith("ws");
var options = new MqttClientOptionsBuilder()
.WithClientId(mqttClientId)
.WithCredentials(username, password)
.WithTls()
.WithCleanSession();
if (ws)
options.WithWebSocketServer(connectionstring);
else
options.WithTcpServer(connectionstring);
var builtOptions = options.Build();
var mqttOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(builtOptions)
.WithAutoReconnectDelay(TimeSpan.FromSeconds(3))
.WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage)
.Build();
await _mqttClient.StartAsync(mqttOptions);
}
public async Task SubscribeAsync()
{
await _mqttClient.SubscribeAsync(_configuration["MQTT:Addtopic"]);
await _mqttClient.SubscribeAsync(_configuration["MQTT:Bulkaddtopic"]);
}
private async Task HandleMessageReceived(MqttApplicationMessageReceivedEventArgs eventArgs)
{
var topic = eventArgs.ApplicationMessage.Topic;
_logger.LogInformation("Message received: {topic}", topic);
try
{
if (topic.EndsWith(_configuration["MQTT:Addtopic"]))
{
_logger.LogInformation("Processing single client add request");
var data = JsonSerializer.Deserialize<RequestData>(eventArgs.ApplicationMessage.Payload);
// Process message...
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message");
}
}
}
ServiceWorker.cs
public class ServiceWorker : BackgroundService
{
private readonly ILogger<ServiceWorker> _logger;
private readonly MQTTClient _mqttClient;
public ServiceWorker(ILogger<ServiceWorker> logger, MQTTClient mqttClient)
{
_logger = logger;
_mqttClient = mqttClient;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("ServiceWorker running at: {time}", DateTimeOffset.Now);
await _mqttClient.StartAsync(stoppingToken);
await _mqttClient.SubscribeAsync();
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken);
}
await _mqttClient.StopAsync(stoppingToken);
}
}
Using what @brits provided I was able to get some logs output. Here's what those look like.
[15:18:18 INF] Connection with server established.
[15:18:18 INF] Start receiving packets.
[15:18:18 INF] TX (81 bytes) >>> Connect: [ClientId=FetchImportService-488b512d-0b99-4073-9d59-93675cb7b682] [Username=test] [Password=****] [KeepAlivePeriod=15] [CleanSession=True]
[15:18:18 INF] RX (4 bytes) <<< ConnAck: [ReturnCode=ConnectionAccepted] [ReasonCode=Success] [IsSessionPresent=False]
[15:18:18 INF] Authenticated MQTT connection with server established.
[15:18:18 INF] Connected.
[15:18:18 INF] Connected to MQTT server
[15:18:18 INF] Start sending keep alive packets.
[15:18:18 INF] Publishing subscriptions at reconnect
[15:18:18 INF] Publishing 2 added and 0 removed subscriptions
[15:18:18 INF] TX (74 bytes) >>> Subscribe: [PacketIdentifier=1] [TopicFilters=test/lookupservice/clients/add@AtLeastOnce,test/lookupservice/clients/bulkadd@AtLeastOnce]
[15:18:18 INF] RX (6 bytes) <<< SubAck: [PacketIdentifier=1] [ReasonCode=UnspecifiedError,UnspecifiedError]
HandleMessageReceived
method is never triggered when messages are sent via MQTTXWhy isn't my service receiving messages that I publish through MQTTX? The client connects successfully, but messages sent through MQTTX never trigger the HandleMessageReceived
method. What could be preventing the message reception?
Upvotes: 0
Views: 34
Reputation: 2292
In your ACL file, which you can access via https://<broker url>/#/authorization/detail/file
make sure your IP isn't blacklisted and/or you're using an unaccepted string for your Client ID.
Upvotes: 0