joesan
joesan

Reputation: 15435

Unable to Connect to any MQTT Broker from my Local machine using HiveMQ Async Clienr

I have the following publish method that connects to a given broker and sends a message and then disconnects:

 def publish(mqttCfg: MqttConfig, topic: String, mqttQos: MqttQos): Future[Unit] = {
    val client = asyncMqttClient(mqttCfg)

    // Define a custom wrapper type to represent the result of the publish operation
    sealed trait PublishResult
    case class SuccessfulPublish(mqttPublishResult: Mqtt5PublishResult) extends PublishResult
    case class FailedPublish(error: Throwable) extends PublishResult

    asyncMqttClient(mqttCfg).connect()
      .thenCompose(_ => client.publishWith().topic(topic).qos(mqttQos).payload("HELLO WORLD!".getBytes()).send())
      .thenAccept(result => {
        val publishResult = Try(result)
        publishResult match {
          case Success(message) =>
            println(s"publishedResult: $message") // TODO: Change to logger
          case Failure(error) =>
            println(s"Failed to publish: ${error.getMessage}") // TODO: Change to logg
        }
      })
      .thenCompose(_ => client.disconnect())
      .thenAccept(_ => println("disconnected"))
      .asScala.map(_ => ())
  }

I then have a Scala test that simply tests this like this:

  "MqttClientFactory#publish" should "connect to a local MQTT broker and publish" in {
    val mqttConfig = MqttConfig("cpo-platform-test", "test.mosquitto.org", 1883)
    val published = MqttClientFactory.publish(
      mqttConfig,
      "cpo-test-topic",
      MqttQos.EXACTLY_ONCE
    )
    whenReady(published, timeout(Span(100, Seconds))) { Unit => {
      val client = MqttClientFactory.asyncMqttClient(mqttConfig)
      println("In here ****************** ")
      client
        .connect()
        .thenCompose(_ => client.subscribeWith().topicFilter("cpo-test-topic").qos(MqttQos.EXACTLY_ONCE).callback(println).send())
      }
    }
  }

When I ran this, it results in the following error on the place where I'm waiting for the Future to complete in the whenReady(......)

The future returned an exception of type: java.util.concurrent.CompletionException, with message: com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected..
ScalaTestFailureLocation: com.openelectrons.cpo.mqtt.MqttClientFactoryTest at (MqttClientFactoryTest.scala:29)

I tried several brokers on my local machine, the eclipse mosquitto broker, the cedalo broker and all of them return the same message. What am I doing wrong? It is so annoying to have a simple connection to get it working. Any help?

EIDT: Further details added:

  def asyncMqttClient(mqttCfg: MqttConfig): Mqtt5AsyncClient = {
    Mqtt5Client.builder()
      .identifier(mqttCfg.appName)
      .serverHost(mqttCfg.serverHost)
      .serverPort(mqttCfg.serverPort)
      .simpleAuth()
        .username(mqttCfg.user.getOrElse(""))
        .password(mqttCfg.pass.getOrElse("").getBytes("UTF-8"))
      .applySimpleAuth()
      .buildAsync()
  }

I use the following docker compose to start my local mqtt mosquitto server:

version: "3.7"
services:
  mqtt5:
    image: eclipse-mosquitto
    container_name: mqtt5
    ports:
      - 1883:1883 #default mqtt port
      - 9001:9001 #default mqtt port for websockets
    volumes:
      - /opt/softwares/mosquitto/mqtt5/config:/mosquitto/config

The MQTT broker is successfully started as shown in the screenshot below:

enter image description here

EDIT:

Here is my mosquitto.conf:

listener 1883
allow_anonymous true
persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log

Here is a screenshot of the logs:

enter image description here

EDIT:

joesan@joesan-InfinityBook-S-14-v5:~$ docker exec -it mqtt5 mosquitto_pub -t /test/message -m 'Hello World!'
joesan@joesan-InfinityBook-S-14-v5:~$ docker exec -it mqtt5 tail -f /mosquitto/log/mosquitto.log
1696296934: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696298735: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696300536: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696302337: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696304138: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696305939: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696307740: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696309170: New connection from 127.0.0.1:39422 on port 1883.
1696309170: New client connected from 127.0.0.1:39422 as auto-8817AB58-2BA0-33D2-5AB0-A6176558E97C (p2, c1, k60).
1696309170: Client auto-8817AB58-2BA0-33D2-5AB0-A6176558E97C disconnected.
^Cjoesan@joesan-InfinityBook-S-14-v5:~$ docker exec -it mqtt5 mosquitto_sub -v -t /test/message
/test/message Hello World!

With the scala test, I see the following logs:

1696310903: New connection from 192.168.208.1:57752 on port 1883.
1696310903: New client connected from 192.168.208.1:57752 as cpo-platform-test (p5, c1, k60).
1696310903: Client cpo-platform-test closed its connection.

Upvotes: 0

Views: 642

Answers (2)

Aaron Franz
Aaron Franz

Reputation: 72

Based on the code you've provided, it looks like we're attempting to use a version of the MQTT5AsyncClient with the hivemq-mqtt-client as mentioned by Gastón Schabas previously.

As Gastón has mentioned above with their working example, this should allow a simple connection to the MQTT broker. Based on what has been provided, it looks like there may be a component from the builder that is inaccurately configuring the client.

My initial recommendation would be to attempt to utilize the Async example that lives within the HiveMQ repo as an initial test, and then build additional functionality using this example as a known-good template. This async demo can be found here.

Best,

Aaron from the HiveMQ Team

Upvotes: 2

Gastón Schabas
Gastón Schabas

Reputation: 3581

Here you have a simple POC that I was able to run in my local. It doesn't validate anything. I only start a eclipse-mosquitto container, connect to the service using the hivemq-mqtt-client, publish a message, subscribe to the topic and print the received message to stdout.

  • build.sbt
libraryDependencies ++= Seq(
  "com.hivemq" % "hivemq-mqtt-client" % "1.3.2",
  "org.scalatest" %% "scalatest" % "3.2.16" % Test,
  "com.dimafeng" %% "testcontainers-scala-scalatest" % TestcontainersScalaVersion % Test
)
  • docker-compose.yaml
version: "3"
services:
  mosquitto:
    image: eclipse-mosquitto:2.0.18
    volumes:
      - /absolute/path/to/mosquitto/config/:/mosquitto/config
    ports:
      - 1883:1883
      - 9001:9001
  • /absolute/path/to/mosquitto/config/mosquitto.conf
listener 1883
allow_anonymous true
persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
  • DummyMosquittoTest.scala
import com.dimafeng.testcontainers.scalatest.TestContainersForAll
import com.dimafeng.testcontainers.{DockerComposeContainer, ExposedService}
import com.hivemq.client.mqtt.datatypes.MqttQos
import com.hivemq.client.mqtt.mqtt5.{Mqtt5AsyncClient, Mqtt5Client}
import org.scalatest.funsuite.AsyncFunSuite
import org.scalatest.matchers.should.Matchers

import java.io.File
import java.util.UUID
import scala.jdk.FutureConverters._

class TestcontainersMainTest
    extends AsyncFunSuite
    with Matchers
    with TestContainersForAll {

  override type Containers = DockerComposeContainer

  override def startContainers(): DockerComposeContainer = {
    DockerComposeContainer
      .Def(
        composeFiles = new File(
          this.getClass.getClassLoader
            .getResource("docker-compose.yaml")
            .getFile
        ),
        exposedServices = Seq(ExposedService(name = "mosquitto", port = 1883))
      )
      .start()
  }

  test("mosquitto container") {
    withContainers { container =>
      val client: Mqtt5AsyncClient = Mqtt5Client
        .builder()
        .identifier(UUID.randomUUID().toString())
        .serverHost("broker.hivemq.com")
        .buildAsync()

      client
        .connect()
        .thenCompose(_ =>
          client
            .publishWith()
            .topic("test/topic")
            .payload("some random message!!!".getBytes)
            .send()
        )
        .asScala
        .map(_ => 1 should be(1))

      client
        .subscribeWith()
        .topicFilter("test/topic")
        .qos(MqttQos.EXACTLY_ONCE)
        .callback(x => println(new String(x.getPayloadAsBytes)))
        .send()
        .asScala
        .map(_ => 1 should be(1))
    }
  }
}

Upvotes: 1

Related Questions