Reputation: 1234
I am seeing inconsistent message delivery with message persistence and qos=2 on mosquitto. Is there anything I'm doing wrong?
I have a simple test app that registers a topic for consumption with clientId="receive-client", but immediately disconnects. It then connects as clientId="send-client" and publishes 10 messages, "message #1" ... "message #10". Then disconnects, waits five seconds, and connects to consume with "receive-client" again while printing and counting the messages received.
The result is inconsistent. Sometimes I receive 6 messages, sometimes 8. Typical output is something like this:
WARN[0005] GOT A MESSAGE:message #1
WARN[0005] GOT A MESSAGE:message #2
WARN[0005] GOT A MESSAGE:message #3
WARN[0005] GOT A MESSAGE:message #4
WARN[0005] GOT A MESSAGE:message #5
WARN[0005] GOT A MESSAGE:message #6
WARN[0005] GOT A MESSAGE:message #7
WARN[0005] GOT A MESSAGE:message #8
WARN[0305] PAUSE
WARN[0605] received message count=8
My version information says 1.4.15. My mosquitto.conf is:
pid_file /var/run/mosquitto.pid
persistence true
persistence_location /var/lib/mosquitto/
allow_anonymous false
password_file /etc/mosquitto/passwd
log_dest file /var/log/mosquitto/mosquitto.log
Initially /var/lib/mosquitto/mosquitto.db doesn't show up until several iterations have been run. My test app is here:
import (
mqtt "github.com/eclipse/paho.mqtt.golang"
log "github.com/sirupsen/logrus"
"time"
)
var receivedMsg int
func Persist() {
const TOPIC = "test"
const URL = "tcp://localhost:1883"
const USERNAME = "myuser"
const PASSWORD = "mypassword"
defer printReceived()
options := mqtt.NewClientOptions().AddBroker(URL).SetUsername(USERNAME).SetPassword(PASSWORD)
options.SetCleanSession(false)
options.SetConnectRetry(true)
options.SetConnectRetryInterval(10 * time.Millisecond)
// register the receive client with broker / TOPIC
// to be sure the broker knows it needs to save our messages
// to deliver at a later time
options.SetClientID("receive-client")
client := mqtt.NewClient(options)
token := client.Connect()
token.Wait()
if token := client.Subscribe(TOPIC, 2, consume1); token.Wait() && token.Error() != nil {
panic(token.Error())
}
client.Disconnect(0)
// connect with send client and send 10 messages
options.SetClientID("send-client")
client = mqtt.NewClient(options)
token = client.Connect()
token.Wait()
client.Publish(TOPIC, 2, false, "message #1")
client.Publish(TOPIC, 2, false, "message #2")
client.Publish(TOPIC, 2, false, "message #3")
client.Publish(TOPIC, 2, false, "message #4")
client.Publish(TOPIC, 2, false, "message #5")
client.Publish(TOPIC, 2, false, "message #6")
client.Publish(TOPIC, 2, false, "message #7")
client.Publish(TOPIC, 2, false, "message #8")
client.Publish(TOPIC, 2, false, "message #9")
client.Publish(TOPIC, 2, false, "message #10")
client.Disconnect(4)
time.Sleep(5* time.Second)
// subscribe again and try to retrieve the messages we missed
options.SetClientID("receive-client")
client = mqtt.NewClient(options)
token = client.Connect()
token.Wait()
if token := client.Subscribe(TOPIC, 2, consume2); token.Wait() && token.Error() != nil {
panic(token.Error())
}
time.Sleep(300 * time.Second)
log.Warn("PAUSE")
time.Sleep(300 * time.Second)
}
func consume1(client mqtt.Client, msg mqtt.Message) {
receivedMsg++
log.Warn("THIS SHOULD NOT BE CONSUMING ANY MESSAGES:", string(msg.Payload()))
}
func consume2(client mqtt.Client, msg mqtt.Message) {
receivedMsg++
log.Warn("GOT A MESSAGE:", string(msg.Payload()))
}
func printReceived() {
log.Warn("received message count=", receivedMsg)
}
Upvotes: 0
Views: 149
Reputation: 59751
To publish at QOS 2 is a multi step process so the most likely reason is that you are disconnecting the publishing client before all the messages are actually finishing publishing to the broker.
You should probably do that publishing in a loop and using the returned token from the call to client.publish()
to wait until it has completed before disconnecting the client.
e.g. as shown in the example:
//Publish 5 messages to /go-mqtt/sample at qos 1 and wait for the receipt
//from the server after sending each message
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
token := c.Publish("go-mqtt/sample", 0, false, text)
token.Wait()
}
Upvotes: 3