Mr. Robot
Mr. Robot

Reputation: 331

Unable to connect with Mqtt using wss protocol

I want to connect mqtt using wss protocol.

What have I tried ->

import 'dart:async';
import 'dart:io';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';

final client = MqttServerClient('wss://myServer.com/mqtt', '');

Future<int> main() async {
client.useWebSocket = true;
client.port = 8083;
client.logging(on: true);
client.keepAlivePeriod = 20;
client.onDisconnected = onDisconnected;
client.onConnected = onConnected;
client.onSubscribed = onSubscribed;
client.pongCallback = pong;
final connMess = MqttConnectMessage()
  .withClientIdentifier('Mqtt_MyClientUniqueId')
  .keepAliveFor(20) 
  .withWillTopic('willtopic') 
  .withWillMessage('My Will message')
  .startClean() 
  .withWillQos(MqttQos.atLeastOnce);
print('EXAMPLE::Mosquitto client connecting....');
client.connectionMessage = connMess;
try {
await client.connect();
} on NoConnectionException catch (e) {
print('EXAMPLE::client exception - $e');
client.disconnect();
} on SocketException catch (e) {
print('EXAMPLE::socket exception - $e');
client.disconnect();
}

/// Check we are connected
if (client.connectionStatus.state == MqttConnectionState.connected) {
print('EXAMPLE::Mosquitto client connected');
} else {
print(
    'EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, status is         
            ${client.connectionStatus}');
 client.disconnect();
 exit(-1);
 }
 print('EXAMPLE::Subscribing to the test/lol topic');
 const topic = 'test/lol';
 client.subscribe(topic, MqttQos.atMostOnce);
 client.updates.listen((List<MqttReceivedMessage<MqttMessage>> c) {
 final MqttPublishMessage recMess = c[0].payload;
 final pt =
 MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
 print(
    'EXAMPLE::Change notification:: topic is <${c[0].topic}>, payload is <-- $pt -->');
  print('');
  });
  client.published.listen((MqttPublishMessage message) {
  print(
    'EXAMPLE::Published notification:: topic is ${message.variableHeader.topicName}, with      Qos ${message.header.qos}');
});
const pubTopic = 'Dart/Mqtt_client/testtopic';
final builder = MqttClientPayloadBuilder();
builder.addString('Hello from mqtt_client');
print('EXAMPLE::Subscribing to the Dart/Mqtt_client/testtopic topic');
client.subscribe(pubTopic, MqttQos.exactlyOnce);
print('EXAMPLE::Publishing our topic');
client.publishMessage(pubTopic, MqttQos.exactlyOnce, builder.payload);
print('EXAMPLE::Sleeping....');
await MqttUtilities.asyncSleep(120);
print('EXAMPLE::Unsubscribing');
client.unsubscribe(topic);
await MqttUtilities.asyncSleep(2);
print('EXAMPLE::Disconnecting');
client.disconnect();
return 0;
}
void onSubscribed(String topic) {
print('EXAMPLE::Subscription confirmed for topic $topic');
}

void onDisconnected() {
print('EXAMPLE::OnDisconnected client callback - Client disconnection');
if (client.connectionStatus.disconnectionOrigin ==
  MqttDisconnectionOrigin.solicited) {
print('EXAMPLE::OnDisconnected callback is solicited, this is correct');
}
exit(-1);
}
void onConnected() {
print(
  'EXAMPLE::OnConnected client callback - Client connection was sucessful');
}
void pong() {
print('EXAMPLE::Ping response client callback invoked');
}

and I get below response in logs

EXAMPLE::Mosquitto client connecting....
I/flutter (17123): 1-2021-01-27 15:15:27.085221 -- MqttConnectionHandlerBase::connect - server wss://myServer.com/mqtt, port 1883
I/flutter (17123): 1-2021-01-27 15:15:27.103638 -- SynchronousMqttServerConnectionHandler::internalConnect entered
I/flutter (17123): 1-2021-01-27 15:15:27.104865 -- SynchronousMqttServerConnectionHandler::internalConnect - initiating connection try 0, auto reconnect in progress false
I/flutter (17123): 1-2021-01-27 15:15:27.107198 -- SynchronousMqttServerConnectionHandler::internalConnect - websocket selected
I/flutter (17123): 1-2021-01-27 15:15:27.110498 -- SynchronousMqttServerConnectionHandler::internalConnect - calling connect
I/flutter (17123): 1-2021-01-27 15:15:27.117350 -- MqttWsConnection::connect - entered
I/flutter (17123): 1-2021-01-27 15:15:27.168054 -- MqttWsConnection::connect - WS URL is wss://myServer.com:1883/mqtt, protocols are [mqtt, mqttv3.1, mqttv3.11]
I/flutter (17123): 1-2021-01-27 15:17:37.221102 -- MqttConnectionBase::_onError - calling disconnected callback
I/flutter (17123): EXAMPLE::socket exception - SocketException: OS Error: Connection timed out, errno = 110, address = myServer.com, port = 32892
I/flutter (17123): 1-2021-01-27 15:17:37.243831 -- MqttConnectionHandlerBase::disconnect - entered
I/flutter (17123): 1-2021-01-27 15:17:37.245454 -- MqttConnectionHandlerBase::_performConnectionDisconnect entered
I/flutter (17123): EXAMPLE::OnDisconnected client callback - Client disconnection

Upvotes: 2

Views: 1264

Answers (1)

Ali Mahmoodi
Ali Mahmoodi

Reputation: 61

it made me tired but this code is working please don't 'secure = true' this is set to 'web socket base'

    class MqttConnection {
  static MqttConnection? _mqttConnection;
  static MqttServerClient? _mqttClient;
 MqttConnection._() {
    _mqttClient =
        MqttServerClient("wss://your domain/", "flutter_client");
    _mqttClient!.port = 9001;
    _mqttClient!.keepAlivePeriod = 20;
    _mqttClient!.autoReconnect = true;
    _mqttClient!.onConnected = _mqttConnected;
    _mqttClient!.onDisconnected = _mqttDisconnected;
    _mqttClient!.useWebSocket = true;
    _mqttClient!.logging(on: true);
  }

  static MqttConnection getInstanse() {
    if (_mqttConnection != null) return _mqttConnection!;
    if (_mqttConnection == null) _mqttConnection = MqttConnection._();
    return _mqttConnection!;
  }

  Future connectMqtt() async {
    final MqttConnectMessage connMess = MqttConnectMessage()
        .authenticateAs(ApiConstant.mqttUser, ApiConstant.mqttPassword)
        .withClientIdentifier((await _getDeviceId())!)
        .withWillQos(MqttQos.atLeastOnce)
        .startClean();
    _mqttClient!.connectionMessage = connMess;
    _mqttClient!.securityContext.setTrustedCertificatesBytes(
        (await rootBundle.load("assets/cert/bundle.crt")).buffer.asUint8List());
    var connection = await _mqttClient!.connect();
    isConnect = connection!.state == MqttConnectionState.connected;
    _mqttClient!.updates!.listen(mqttEventRecived);
  }

  Function? callback;
  bool isConnect = false;
  bool tryToConnect = true;

  void stopMqtt() {
    tryToConnect = false;
    _mqttClient!.disconnect();
    isConnect = false;
  }

  subscribeTopics(List<String> topics) async {
    if (!isConnect) {
      await connectMqtt();
    }
    for (var topic in topics) {
      _mqttClient!.subscribe(topic, MqttQos.atLeastOnce);
    }
  }

  unSubscribeTopics(List<String> topics) {
    for (var topic in topics) {
      _mqttClient!.unsubscribe(topic);
    }
  }

  void _mqttConnected() {
    print("connected");
    isConnect = true;
  }

  void _mqttDisconnected() {
    isConnect = false;
  }

  Future<void> mqttEventRecived(event) async {
    if (callback != null) {
     
    }
  }
}

Upvotes: 1

Related Questions