Reputation: 15435
I have the following publish method that connects to a given broker and sends a message and then disconnects:
def publish(mqttCfg: MqttConfig, topic: String, mqttQos: MqttQos): Future[Unit] = {
val client = asyncMqttClient(mqttCfg)
// Define a custom wrapper type to represent the result of the publish operation
sealed trait PublishResult
case class SuccessfulPublish(mqttPublishResult: Mqtt5PublishResult) extends PublishResult
case class FailedPublish(error: Throwable) extends PublishResult
asyncMqttClient(mqttCfg).connect()
.thenCompose(_ => client.publishWith().topic(topic).qos(mqttQos).payload("HELLO WORLD!".getBytes()).send())
.thenAccept(result => {
val publishResult = Try(result)
publishResult match {
case Success(message) =>
println(s"publishedResult: $message") // TODO: Change to logger
case Failure(error) =>
println(s"Failed to publish: ${error.getMessage}") // TODO: Change to logg
}
})
.thenCompose(_ => client.disconnect())
.thenAccept(_ => println("disconnected"))
.asScala.map(_ => ())
}
I then have a Scala test that simply tests this like this:
"MqttClientFactory#publish" should "connect to a local MQTT broker and publish" in {
val mqttConfig = MqttConfig("cpo-platform-test", "test.mosquitto.org", 1883)
val published = MqttClientFactory.publish(
mqttConfig,
"cpo-test-topic",
MqttQos.EXACTLY_ONCE
)
whenReady(published, timeout(Span(100, Seconds))) { Unit => {
val client = MqttClientFactory.asyncMqttClient(mqttConfig)
println("In here ****************** ")
client
.connect()
.thenCompose(_ => client.subscribeWith().topicFilter("cpo-test-topic").qos(MqttQos.EXACTLY_ONCE).callback(println).send())
}
}
}
When I ran this, it results in the following error on the place where I'm waiting for the Future to complete in the whenReady(......)
The future returned an exception of type: java.util.concurrent.CompletionException, with message: com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected..
ScalaTestFailureLocation: com.openelectrons.cpo.mqtt.MqttClientFactoryTest at (MqttClientFactoryTest.scala:29)
I tried several brokers on my local machine, the eclipse mosquitto broker, the cedalo broker and all of them return the same message. What am I doing wrong? It is so annoying to have a simple connection to get it working. Any help?
EIDT: Further details added:
def asyncMqttClient(mqttCfg: MqttConfig): Mqtt5AsyncClient = {
Mqtt5Client.builder()
.identifier(mqttCfg.appName)
.serverHost(mqttCfg.serverHost)
.serverPort(mqttCfg.serverPort)
.simpleAuth()
.username(mqttCfg.user.getOrElse(""))
.password(mqttCfg.pass.getOrElse("").getBytes("UTF-8"))
.applySimpleAuth()
.buildAsync()
}
I use the following docker compose to start my local mqtt mosquitto server:
version: "3.7"
services:
mqtt5:
image: eclipse-mosquitto
container_name: mqtt5
ports:
- 1883:1883 #default mqtt port
- 9001:9001 #default mqtt port for websockets
volumes:
- /opt/softwares/mosquitto/mqtt5/config:/mosquitto/config
The MQTT broker is successfully started as shown in the screenshot below:
EDIT:
Here is my mosquitto.conf:
listener 1883
allow_anonymous true
persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
Here is a screenshot of the logs:
EDIT:
joesan@joesan-InfinityBook-S-14-v5:~$ docker exec -it mqtt5 mosquitto_pub -t /test/message -m 'Hello World!'
joesan@joesan-InfinityBook-S-14-v5:~$ docker exec -it mqtt5 tail -f /mosquitto/log/mosquitto.log
1696296934: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696298735: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696300536: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696302337: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696304138: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696305939: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696307740: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696309170: New connection from 127.0.0.1:39422 on port 1883.
1696309170: New client connected from 127.0.0.1:39422 as auto-8817AB58-2BA0-33D2-5AB0-A6176558E97C (p2, c1, k60).
1696309170: Client auto-8817AB58-2BA0-33D2-5AB0-A6176558E97C disconnected.
^Cjoesan@joesan-InfinityBook-S-14-v5:~$ docker exec -it mqtt5 mosquitto_sub -v -t /test/message
/test/message Hello World!
With the scala test, I see the following logs:
1696310903: New connection from 192.168.208.1:57752 on port 1883.
1696310903: New client connected from 192.168.208.1:57752 as cpo-platform-test (p5, c1, k60).
1696310903: Client cpo-platform-test closed its connection.
Upvotes: 0
Views: 642
Reputation: 72
Based on the code you've provided, it looks like we're attempting to use a version of the MQTT5AsyncClient with the hivemq-mqtt-client as mentioned by Gastón Schabas previously.
As Gastón has mentioned above with their working example, this should allow a simple connection to the MQTT broker. Based on what has been provided, it looks like there may be a component from the builder that is inaccurately configuring the client.
My initial recommendation would be to attempt to utilize the Async example that lives within the HiveMQ repo as an initial test, and then build additional functionality using this example as a known-good template. This async demo can be found here.
Best,
Aaron from the HiveMQ Team
Upvotes: 2
Reputation: 3581
Here you have a simple POC that I was able to run in my local. It doesn't validate anything. I only start a eclipse-mosquitto container, connect to the service using the hivemq-mqtt-client, publish a message, subscribe to the topic and print the received message to stdout
.
build.sbt
libraryDependencies ++= Seq(
"com.hivemq" % "hivemq-mqtt-client" % "1.3.2",
"org.scalatest" %% "scalatest" % "3.2.16" % Test,
"com.dimafeng" %% "testcontainers-scala-scalatest" % TestcontainersScalaVersion % Test
)
docker-compose.yaml
version: "3"
services:
mosquitto:
image: eclipse-mosquitto:2.0.18
volumes:
- /absolute/path/to/mosquitto/config/:/mosquitto/config
ports:
- 1883:1883
- 9001:9001
/absolute/path/to/mosquitto/config/mosquitto.conf
listener 1883
allow_anonymous true
persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
DummyMosquittoTest.scala
import com.dimafeng.testcontainers.scalatest.TestContainersForAll
import com.dimafeng.testcontainers.{DockerComposeContainer, ExposedService}
import com.hivemq.client.mqtt.datatypes.MqttQos
import com.hivemq.client.mqtt.mqtt5.{Mqtt5AsyncClient, Mqtt5Client}
import org.scalatest.funsuite.AsyncFunSuite
import org.scalatest.matchers.should.Matchers
import java.io.File
import java.util.UUID
import scala.jdk.FutureConverters._
class TestcontainersMainTest
extends AsyncFunSuite
with Matchers
with TestContainersForAll {
override type Containers = DockerComposeContainer
override def startContainers(): DockerComposeContainer = {
DockerComposeContainer
.Def(
composeFiles = new File(
this.getClass.getClassLoader
.getResource("docker-compose.yaml")
.getFile
),
exposedServices = Seq(ExposedService(name = "mosquitto", port = 1883))
)
.start()
}
test("mosquitto container") {
withContainers { container =>
val client: Mqtt5AsyncClient = Mqtt5Client
.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.buildAsync()
client
.connect()
.thenCompose(_ =>
client
.publishWith()
.topic("test/topic")
.payload("some random message!!!".getBytes)
.send()
)
.asScala
.map(_ => 1 should be(1))
client
.subscribeWith()
.topicFilter("test/topic")
.qos(MqttQos.EXACTLY_ONCE)
.callback(x => println(new String(x.getPayloadAsBytes)))
.send()
.asScala
.map(_ => 1 should be(1))
}
}
}
Upvotes: 1