Arlo Guthrie
Arlo Guthrie

Reputation: 1234

inconsistent persistence in mosquitto

I am seeing inconsistent message delivery with message persistence and qos=2 on mosquitto. Is there anything I'm doing wrong?

I have a simple test app that registers a topic for consumption with clientId="receive-client", but immediately disconnects. It then connects as clientId="send-client" and publishes 10 messages, "message #1" ... "message #10". Then disconnects, waits five seconds, and connects to consume with "receive-client" again while printing and counting the messages received.

The result is inconsistent. Sometimes I receive 6 messages, sometimes 8. Typical output is something like this:

WARN[0005] GOT A MESSAGE:message #1                     
WARN[0005] GOT A MESSAGE:message #2                     
WARN[0005] GOT A MESSAGE:message #3                     
WARN[0005] GOT A MESSAGE:message #4                     
WARN[0005] GOT A MESSAGE:message #5                     
WARN[0005] GOT A MESSAGE:message #6                     
WARN[0005] GOT A MESSAGE:message #7                     
WARN[0005] GOT A MESSAGE:message #8                     
WARN[0305] PAUSE                                        
WARN[0605] received message count=8                     

My version information says 1.4.15. My mosquitto.conf is:

pid_file /var/run/mosquitto.pid

persistence true
persistence_location /var/lib/mosquitto/

allow_anonymous false
password_file /etc/mosquitto/passwd

log_dest file /var/log/mosquitto/mosquitto.log

Initially /var/lib/mosquitto/mosquitto.db doesn't show up until several iterations have been run. My test app is here:

import (
    mqtt "github.com/eclipse/paho.mqtt.golang"
    log "github.com/sirupsen/logrus"
    "time"
)

var receivedMsg int

func Persist() {
    const TOPIC = "test"
    const URL = "tcp://localhost:1883"
    const USERNAME = "myuser"
    const PASSWORD = "mypassword"

    defer printReceived()

    options := mqtt.NewClientOptions().AddBroker(URL).SetUsername(USERNAME).SetPassword(PASSWORD)
    options.SetCleanSession(false)
    options.SetConnectRetry(true)
    options.SetConnectRetryInterval(10 * time.Millisecond)

    // register the receive client with broker / TOPIC
    // to be sure the broker knows it needs to save our messages
    // to deliver at a later time
    options.SetClientID("receive-client")
    client := mqtt.NewClient(options)
    token := client.Connect()
    token.Wait()
    if token := client.Subscribe(TOPIC, 2, consume1); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    client.Disconnect(0)

    // connect with send client and send 10 messages
    options.SetClientID("send-client")
    client = mqtt.NewClient(options)
    token = client.Connect()
    token.Wait()

    client.Publish(TOPIC, 2, false, "message #1")
    client.Publish(TOPIC, 2, false, "message #2")
    client.Publish(TOPIC, 2, false, "message #3")
    client.Publish(TOPIC, 2, false, "message #4")
    client.Publish(TOPIC, 2, false, "message #5")
    client.Publish(TOPIC, 2, false, "message #6")
    client.Publish(TOPIC, 2, false, "message #7")
    client.Publish(TOPIC, 2, false, "message #8")
    client.Publish(TOPIC, 2, false, "message #9")
    client.Publish(TOPIC, 2, false, "message #10")
    client.Disconnect(4)
    time.Sleep(5* time.Second)

    // subscribe again and try to retrieve the messages we missed
    options.SetClientID("receive-client")
    client = mqtt.NewClient(options)
    token = client.Connect()
    token.Wait()

    if token := client.Subscribe(TOPIC, 2, consume2); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    time.Sleep(300 * time.Second)
    log.Warn("PAUSE")
    time.Sleep(300 * time.Second)
}

func consume1(client mqtt.Client, msg mqtt.Message) {
    receivedMsg++
    log.Warn("THIS SHOULD NOT BE CONSUMING ANY MESSAGES:", string(msg.Payload()))
}

func consume2(client mqtt.Client, msg mqtt.Message) {
    receivedMsg++
    log.Warn("GOT A MESSAGE:", string(msg.Payload()))
}

func printReceived() {
    log.Warn("received message count=", receivedMsg)
}

Upvotes: 0

Views: 149

Answers (1)

hardillb
hardillb

Reputation: 59751

To publish at QOS 2 is a multi step process so the most likely reason is that you are disconnecting the publishing client before all the messages are actually finishing publishing to the broker. You should probably do that publishing in a loop and using the returned token from the call to client.publish() to wait until it has completed before disconnecting the client.

e.g. as shown in the example:

//Publish 5 messages to /go-mqtt/sample at qos 1 and wait for the receipt
//from the server after sending each message
for i := 0; i < 5; i++ {
  text := fmt.Sprintf("this is msg #%d!", i)
  token := c.Publish("go-mqtt/sample", 0, false, text)
  token.Wait()
}

Upvotes: 3

Related Questions