Arlo Guthrie
Arlo Guthrie

Reputation: 1234

Mosquitto subscriber receiving extra message

I am exploring mosquitto as a complete beginner. I have a golang test program that I'm using to see how it manages messages, but subsequent runs of the program show an extra message received.

import (
    mqtt "github.com/eclipse/paho.mqtt.golang"
    log "github.com/sirupsen/logrus"
    "strconv"
    "time"
)
var sent int
var received int

func Test() {
    sent  = 0
    received = 0
    const TOPIC = "mytopic/test"

    opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883").SetClientID("client1").SetResumeSubs(false).SetAutoReconnect(false).SetCleanSession(true)

    client1 := mqtt.NewClient(opts)
    if token := client1.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    if token := client1.Subscribe(TOPIC, 2, receiver1); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    defer howManySent()
    defer howManyReceived()
    defer client1.Disconnect(1)

    for index:=0;index<3;index++ {
        sendMsg := "client1 message " + strconv.Itoa(index)
        if token := client1.Publish(TOPIC, 2, true, sendMsg); token.Wait() && token.Error() != nil {
            panic(token.Error())
        } else {
            sent++
        }
        time.Sleep(1 * time.Second)
    }
    time.Sleep(1*time.Second)
}

func howManyReceived() {
    log.Warn("RECEIVED:", received)
}

func howManySent() {
    log.Warn("SENT:", sent)
}

func receiver1(client mqtt.Client, msg mqtt.Message) {
    received++
    msg.Ack()
    output := "message id:" + strconv.Itoa(int(msg.MessageID())) + " message = " + string(msg.Payload())
    log.Warn(output)
}

Output for the first run:

WARN[0000] message id:1 message = client1 message 0     
WARN[0001] message id:2 message = client1 message 1     
WARN[0002] message id:3 message = client1 message 2     
WARN[0004] RECEIVED:3                                   
WARN[0004] SENT:3      

Output for the second run:

WARN[0000] message id:4 message = client1 message 2     
WARN[0000] message id:5 message = client1 message 0     
WARN[0001] message id:6 message = client1 message 1     
WARN[0002] message id:7 message = client1 message 2     
WARN[0004] RECEIVED:4                                   
WARN[0004] SENT:3    

My configuration file is the default installed with mosquitto, except I turned off persistence to see if it helped:

# Place your local configuration in /etc/mosquitto/conf.d/
#
# A full description of the configuration file is at
# /usr/share/doc/mosquitto/examples/mosquitto.conf.example

pid_file /var/run/mosquitto.pid

persistence false
persistence_location /var/lib/mosquitto/

log_dest file /var/log/mosquitto/mosquitto.log

include_dir /etc/mosquitto/conf.d

What am I doing wrong?

Upvotes: 0

Views: 391

Answers (1)

Brits
Brits

Reputation: 18245

Publish is defined as follows:

Publish(topic string, qos byte, retained bool, payload interface{}) Token

So when you call token := client1.Publish(TOPIC, 2, true, sendMsg) you are publishing a message with retained set to true. Retain is a flag passed with the message to the broker and means:

If the RETAIN flag is set to 1, in a PUBLISH Packet sent by a Client to a Server, the Server MUST store the Application Message and its QoS, so that it can be delivered to future subscribers whose subscriptions match its topic name [MQTT-3.3.1-5]. When a new subscription is established, the last retained message, if any, on each matching topic name MUST be sent to the subscriber [MQTT-3.3.1-6].

Each new message with retain set replaces any previous messages so when your first execution completes message 2 is retained. This is sent to the client when it reconnects (meaning the results you are seeing are expected). If you change the call to token := client1.Publish(TOPIC, 2, false, sendMsg) it will act as you expect.

The mosquitto docs say the following about the persistence flag:

If true, connection, subscription and message data will be written to the disk in mosquitto.db at the location dictated by persistence_location. When mosquitto is restarted, it will reload the information stored in mosquitto.db

So this only has an impact if you restart Mosquitto; session information will still be stored in memory even if this flag is set to false.

Upvotes: 1

Related Questions