Reputation: 33
I am facing an issue that, after a few disconnects and reconnects, my mqtt client (let's say A
) stops receiving messages from another publishing client (B
).
If I subscribe manually with mosquitto_sub
to the topic B
is publishing, I can see that all messages get published as expected. If I publish manually (mosquitto_pub
) to the topic A
is subscribed to, A
receives these messages as well, so the subscription seems to work. Only if B
is publishing to the topic, A
is not receiving the messages.
Both clients are connected to a mosquitto broker (version 1.6.12). Everything is running on a RaspberryPi CM3. The clients are written in Go with the paho mqtt library (v1.4.2) and started as systemd services. The clients are initialized with following options:
opts.SetCleanSession(false)
opts.SetKeepAlive(10 * time.Second)
opts.SetPingTimeout(1 * time.Second)
opts.SetAutoReconnect(true)
opts.SetConnectTimeout(15 * time.Second)
Also both clients get a unique ID. I looked into the mosquitto logs and noticed that some messages get sent to the reconnecting client before he subscribed.
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m722, 'example-topic-1/a', ... (109 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m723, 'example-topic-1/b', ... (118 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m904, 'example-topic-1/a', ... (109 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m1085, 'example-topic-1/a', ... (109 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m1086, 'example-topic-1/b', ... (118 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m1267, 'example-topic-1/a', ... (109 bytes))
...
...
1678786214: Sending PUBLISH to telemetry (d0, q1, r0, m2293, 'example-topic-1/c'1, ... (119 bytes))
1678786214: Received PUBLISH from telemetry (d0, q2, r1, m1, 'example-topic-2', ... (7 bytes))
1678786214: Sending PUBREC to telemetry (m1, rc0)
1678786214: Received PUBLISH from telemetry (d0, q2, r1, m2, 'example-topic-1/a', ... (109 bytes))
1678786214: Sending PUBREC to telemetry (m2, rc0)
1678786214: Received PUBREL from telemetry (Mid: 1)
1678786214: Sending PUBCOMP to telemetry (m1)
1678786214: Received PUBREL from telemetry (Mid: 2)
1678786214: Sending PUBCOMP to telemetry (m2)
1678786214: Received SUBSCRIBE from telemetry
1678786214: shutdown (QoS 2)
1678786214: telemetry 2 shutdown
1678786214: Sending SUBACK to telemetry
1678786214: Received SUBSCRIBE from telemetry
1678786214: example-topic-1/# (QoS 1)
1678786214: telemetry 1 example-topic-1/#
1678786214: Sending SUBACK to telemetry
In the documentation of the mqtt library it states
When QOS1+ subscriptions have been created previously and you connect with CleanSession set to false it is possible that the broker will deliver retained messages before Subscribe can be called. To process these messages either configure a handler with AddRoute or set a DefaultPublishHandler.
Does this mean the messages will just be lost, or is it actually blocking the topic somehow?
So far I tried updating mosquitto from initially version 1.6.10 to 1.6.12, which didn't resolve the issue. Restarting the mosquitto.service seems to fix the issue, but is not really a solution. My next step would be setting a DefaultPublishHandler
to process the messages that get sent too early.
Thanks in advance for any help you can provide. If more information is required, let me know!
I tried around a bit more and looked at the mosquitto logs and could narrow the problem down a bit, because it seems like it has to do with the QoS level of the messages.
While client A
is not receiving any messages client B
is sending, I tried registering a new client (with mosquitto_sub
), let's call it C
, who receives the messages from client B
regardless of QoS level of the subscription. When publishing with client C
, client A
receives only messages sent with QoS of 0. If I specify a QoS of 1 or 2, client A
receives nothing. Since client B
sends all messages with a QoS of 1, it seems like the broker is not sending any messages with a QoS > 0 to client A
, but to other clients.
The OrderMatters
is set to false for all clients and the only configuration of the broker is max_queued_messages 0
.
Here are some relevant snippets of the logs.
1678874048: Sending PUBACK to clientB (m732, rc0)
1678874048: Received PUBLISH from clientB (d0, q1, r0, m733, 'topic/b', ... (128 bytes))
1678874048: Sending PUBACK to clientB (m733, rc0)
1678874048: Received PUBLISH from clientB (d0, q1, r0, m734, 'topic/c', ... (135 bytes))
1678874048: Sending PUBACK to clientB (m734, rc0)
Notice how no messages get published to clientA.
1678874088: No will message specified.
1678874088: Sending CONNACK to mosq-glZkfjX79CriC0qOK5 (0, 0)
1678874088: Received PUBLISH from mosq-glZkfjX79CriC0qOK5 (d0, q1, r0, m1, 'topic/a', ... (130 bytes))
1678874088: Sending PUBACK to mosq-glZkfjX79CriC0qOK5 (m1, rc0)
1678874088: Received DISCONNECT from mosq-glZkfjX79CriC0qOK5
1678874088: Client mosq-glZkfjX79CriC0qOK5 disconnected.
Same if publishing with a new client.
1678874043: No will message specified.
1678874043: Sending CONNACK to mosq-pba5gDo7lNe7Sqz5jd (0, 0)
1678874043: Received PUBLISH from mosq-pba5gDo7lNe7Sqz5jd (d0, q0, r0, m0, 'topic/a', ... (130 bytes))
1678874043: Sending PUBLISH to clientA (d0, q0, r0, m0, 'topic/a', ... (130 bytes))
1678874043: Received DISCONNECT from mosq-pba5gDo7lNe7Sqz5jd
1678874043: Client mosq-pba5gDo7lNe7Sqz5jd disconnected.
Publishing with a new Client and QoS of 0 gets published to clientA.
1678874687: No will message specified.
1678874687: Sending CONNACK to mosq-vkwirAtvYxHwoLnCQH (0, 0)
1678874687: Received SUBSCRIBE from mosq-vkwirAtvYxHwoLnCQH
1678874687: topic/+ (QoS 1)
1678874687: mosq-vkwirAtvYxHwoLnCQH 1 topic/+
1678874687: Sending SUBACK to mosq-vkwirAtvYxHwoLnCQH
...
1678874695: Received PUBLISH from clientB (d0, q1, r0, m673, 'topic/a', ... (137 bytes))
1678874695: Sending PUBACK to clientB (m673, rc0)
1678874695: Sending PUBLISH to mosq-vkwirAtvYxHwoLnCQH (d0, q1, r0, m3, 'topic/a', ... (137 bytes))
1678874695: Received PUBLISH from clientB (d0, q1, r0, m674, 'topic/b', ... (130 bytes))
1678874695: Sending PUBACK to clientB (m674, rc0)
1678874695: Received PUBACK from mosq-vkwirAtvYxHwoLnCQH (Mid: 3, RC:0)
1678874695: Sending PUBLISH to mosq-vkwirAtvYxHwoLnCQH (d0, q1, r0, m4, 'topic/b', ... (130 bytes))
1678874695: Received PUBACK from mosq-vkwirAtvYxHwoLnCQH (Mid: 4, RC:0)
And subscribing with a new client with QoS 1 also works...
I am honestly a bit out of ideas right now, since the behaviour is quite weird imo. The callbackHandlers do not seem to block on clientA
s side, since messages with QoS 0 still get processed. Is there any setting or configuration that could mess with a QoS 1 subscription?
The following logs are basically before and after restarting the services, which resulted in the described behaviour. Since there are more than just those two clients constantly sending stuff I cut out some parts again, but everything regarding the two clients in question should be in here. From the first connect and a few working publishes to restarting and the messages not being sent anymore.
Another thing I noticed is that when shutting down the service, the client never calls Disconnect
. It tries to unsubscribe, which sometimes does not even get completed. Could this cause an issue like this?
1678880076: New client connected from 127.0.0.1 as clientA (p2, c0, k10).
1678880076: No will message specified.
1678880076: Sending CONNACK to clientA (1, 0)
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m18734, 'topic1/matlab_version_expected', ... (152 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m18735, 'topic1/coreagent_ota_state', ... (112 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19039, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19041, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19331, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19333, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19453, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19454, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19575, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19576, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20053, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20054, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20235, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20236, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20349, 'topic1/ostree_sha_rollback', ... (172 bytes))
... (lots of PUBLISH withouth PUBACK from clientA)
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20540, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Received PUBLISH from clientA (d0, q2, r1, m1, 'conn-status', ... (7 bytes))
1678880076: Sending PUBREC to clientA (m1, rc0)
1678880076: Received PUBLISH from clientA (d0, q2, r1, m2, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBREC to clientA (m2, rc0)
1678880076: Received PUBREL from clientA (Mid: 1)
1678880076: Sending PUBCOMP to clientA (m1)
1678880076: Received PUBREL from clientA (Mid: 2)
1678880076: Sending PUBCOMP to clientA (m2)
1678880076: Sending PUBLISH to clientA (d0, q1, r0, m20720, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Received PUBLISH from clientA (d0, q2, r1, m3, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBREC to clientA (m3, rc0)
1678880076: Received SUBSCRIBE from clientA
1678880076: shutdown (QoS 2)
1678880076: clientA 2 shutdown
1678880076: Sending SUBACK to clientA
1678880076: Received PUBREL from clientA (Mid: 3)
1678880076: Sending PUBCOMP to clientA (m3)
1678880076: Sending PUBLISH to clientA (d0, q1, r0, m20721, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Received SUBSCRIBE from clientA
1678880076: telemetry/+ (QoS 1)
1678880076: clientA 1 telemetry/+
1678880076: Sending SUBACK to clientA
1678880076: Sending PUBLISH to clientA (d0, q1, r1, m20722, 'telemetry/geolocation', ... (51 bytes))
1678880076: Received PUBACK from clientA (Mid: 20722, RC:0)
1678880076: Received SUBSCRIBE from clientA
1678880076: telemetry-batch (QoS 1)
1678880076: clientA 1 telemetry-batch
1678880076: Sending SUBACK to clientA
1678880076: Received SUBSCRIBE from clientA
1678880076: topic1/# (QoS 1)
1678880076: clientA 1 topic1/#
1678880076: Sending SUBACK to clientA
1678880076: Sending PUBLISH to clientA (d0, q1, r1, m20723, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d0, q1, r1, m20724, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Received PUBACK from clientA (Mid: 20723, RC:0)
1678880076: Received PUBACK from clientA (Mid: 20724, RC:0)
1678880076: Received PUBLISH from clientA (d0, q2, r1, m8, 'conn-status', ... (6 bytes))
1678880076: Sending PUBREC to clientA (m8, rc0)
1678880076: Received PUBREL from clientA (Mid: 8)
1678880076: Sending PUBCOMP to clientA (m8)
...
1678880083: New connection from 127.0.0.1 on port 1883.
1678880083: New client connected from 127.0.0.1 as clientB (p2, c0, k10).
1678880083: No will message specified.
1678880083: Sending CONNACK to clientB (1, 0)
1678880085: Received PUBLISH from clientA (d0, q2, r1, m9, 'session', ... (36 bytes))
1678880085: Sending PUBREC to clientA (m9, rc0)
1678880085: Received PUBREL from clientA (Mid: 9)
1678880085: Sending PUBCOMP to clientA (m9)
...
1678880086: Received PUBLISH from clientB (d0, q1, r0, m3, 'topic1/net_response', ... (118 bytes))
1678880086: Sending PUBACK to clientB (m3, rc0)
1678880086: Sending PUBLISH to clientA (d0, q1, r0, m20725, 'topic1/net_response', ... (118 bytes))
1678880086: Received PUBACK from clientA (Mid: 20725, RC:0)
1678880086: Received PUBLISH from clientB (d0, q1, r0, m4, 'topic1/public_ip_addr', ... (116 bytes))
1678880086: Sending PUBACK to clientB (m4, rc0)
1678880086: Sending PUBLISH to clientA (d0, q1, r0, m20726, 'topic1/public_ip_addr', ... (116 bytes))
1678880086: Received PUBACK from clientA (Mid: 20726, RC:0)
... (lots of PUBLISH with PUBACK from telemetry)
1678880086: Sending PUBLISH to clientA (d0, q1, r0, m20742, 'topic1/swap', ... (169 bytes))
1678880086: Received PUBACK from clientA (Mid: 20742, RC:0)
1678880086: Received PUBLISH from clientB (d0, q1, r0, m21, 'topic1/syslog', ... (508219 bytes))
1678880086: Sending PUBACK to clientB (m21, rc0)
1678880086: Sending PUBLISH to clientA (d0, q1, r0, m20743, 'topic1/syslog', ... (508219 bytes))
1678880086: Received PUBACK from clientA (Mid: 20743, RC:0)
...
1678880088: Received PUBLISH from clientB (d0, q1, r0, m22, 'topic1/version_ca_lockdown', ... (133 bytes))
1678880088: Sending PUBACK to clientB (m22, rc0)
... (here the services get restarted and afterwards the subscription seems broken for QoS >= 1)
1678880118: Received UNSUBSCRIBE from clientA
1678880118: telemetry/+
1678880118: clientA telemetry/+
1678880118: Sending UNSUBACK to clientA
1678880118: Socket error on client clientA, disconnecting.
1678880119: Received PUBLISH from clientB (d0, q1, r0, m136, 'topic1/release_id', ... (126 bytes))
1678880119: Sending PUBACK to clientB (m136, rc0)
1678880119: Received PUBLISH from clientB (d0, q1, r0, m137, 'topic1/version_ca_configurator', ... (137 bytes))
1678880119: Sending PUBACK to clientB (m137, rc0)
...
1678880125: New connection from 127.0.0.1 on port 1883.
1678880125: New client connected from 127.0.0.1 as clientA (p2, c0, k10).
1678880125: No will message specified.
1678880125: Sending CONNACK to clientA (1, 0)
1678880125: Sending PUBLISH to clientA (d1, q1, r0, m18734, 'topic1/matlab_version_expected', ... (152 bytes))
1678880125: Sending PUBLISH to clientA (d1, q1, r0, m18735, 'topic1/coreagent_ota_state', ... (112 bytes))
... (again, lots of PUBLISH without PUBACK)
1678880125: Sending PUBLISH to clientA (d0, q1, r0, m20873, 'topic1/proc_led', ... (151 bytes))
1678880125: Sending PUBLISH to clientA (d0, q1, r0, m20874, 'topic1/coreagent_ota_state', ... (112 bytes))
1678880125: Sending PUBLISH to clientA (d0, q1, r0, m20875, 'topic1/ssid_wlan0', ... (99 bytes))
1678880125: Received PUBLISH from clientA (d0, q2, r1, m1, 'conn-status', ... (7 bytes))
1678880125: Sending PUBREC to clientA (m1, rc0)
1678880125: Received PUBLISH from clientA (d0, q2, r1, m2, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880125: Sending PUBREC to clientA (m2, rc0)
1678880125: Received PUBREL from clientA (Mid: 1)
1678880125: Sending PUBCOMP to clientA (m1)
1678880125: Received PUBREL from clientA (Mid: 2)
1678880125: Sending PUBCOMP to clientA (m2)
1678880125: Received SUBSCRIBE from clientA
1678880125: shutdown (QoS 2)
1678880125: clientA 2 shutdown
1678880125: Sending SUBACK to clientA
1678880125: Received PUBLISH from clientA (d0, q2, r1, m4, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880125: Sending PUBREC to clientA (m4, rc0)
1678880125: Received SUBSCRIBE from clientA
1678880125: telemetry/+ (QoS 1)
1678880125: clientA 1 telemetry/+
1678880125: Sending SUBACK to clientA
1678880125: Received PUBREL from clientA (Mid: 4)
1678880125: Sending PUBCOMP to clientA (m4)
1678880125: Received SUBSCRIBE from clientA
1678880125: telemetry-batch (QoS 1)
1678880125: clientA 1 telemetry-batch
1678880125: Sending SUBACK to clientA
1678880125: Received SUBSCRIBE from clientA
1678880125: topic1/# (QoS 1)
1678880125: clientA 1 topic1/#
1678880125: Sending SUBACK to clientA
1678880126: Received PUBLISH from clientA (d0, q2, r1, m8, 'conn-status', ... (6 bytes))
1678880126: Sending PUBREC to clientA (m8, rc0)
1678880126: Received PUBREL from clientA (Mid: 8)
1678880126: Sending PUBCOMP to clientA (m8)
...
1678880133: New connection from 127.0.0.1 on port 1883.
1678880133: New client connected from 127.0.0.1 as clientB (p2, c0, k10).
1678880133: No will message specified.
1678880133: Sending CONNACK to clientB (1, 0)
...
1678880135: Received PUBLISH from clientB (d0, q1, r0, m3, 'topic1/provision_date', ... (131 bytes))
1678880135: Sending PUBACK to clientB (m3, rc0)
1678880135: Received PUBLISH from clientB (d0, q1, r0, m4, 'topic1/processes', ... (99 bytes))
... (There are no more PUBLISH message from broker to clientA)
1678880135: Sending PUBACK to clientB (m20, rc0)
1678880135: Received PUBLISH from clientB (d0, q1, r0, m21, 'topic1/net_response', ... (118 bytes))
1678880135: Sending PUBACK to clientB (m21, rc0)
I am still working on a minimal, reproducible example, but unfortunately no luck so far in recreating the error outside of the embedded environment.
Just for completeness, I managed to build a minimal, reproducible example and thought I'll post it as well.
package main
import (
"os"
"os/signal"
"syscall"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/sirupsen/logrus"
)
func main() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
stop := make(chan bool, 1)
optsA := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883")
logrus.Info("setup and connect clientA")
clientA := setup(optsA, "clientA")
if err := connect(clientA); err != nil {
panic(err)
}
if tok := clientA.Subscribe("topic/+", 1, func(c mqtt.Client, m mqtt.Message) {
logrus.Infof("received on topic %s ; message: %s", string(m.Topic()), string(m.Payload()))
}); tok.Wait() && tok.Error() != nil {
panic(tok.Error())
}
logrus.Infof("setup and connect clientB")
optsB := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883")
clientB := setup(optsB, "clientB")
if err := connect(clientB); err != nil {
panic(err)
}
logrus.Info("start publishing from clientB")
go publish(clientB, stop)
logrus.Info("let clientA receive some messages from clientB")
time.Sleep(500 * time.Millisecond)
clientA.Disconnect(10)
logrus.Info("wait until ClientB published more than max_inflight_messages")
time.Sleep(1100 * time.Millisecond)
logrus.Infof("connecting clientA again")
clientA = setup(optsA, "clientA")
if err := connect(clientA); err != nil {
panic(err)
}
logrus.Info("wait shortly before subscribing")
time.Sleep(1 * time.Second)
logrus.Info("subscribe with clientA")
if tok := clientA.Subscribe("topic/+", 1, func(c mqtt.Client, m mqtt.Message) {
logrus.Infof("received on topic %s ; message: %s", string(m.Topic()), string(m.Payload()))
}); tok.Wait() && tok.Error() != nil {
panic(tok.Error())
}
<-c
stop <- true
}
func publish(client mqtt.Client, stop chan bool) {
for {
select {
case <-stop:
return
default:
if tok := client.Publish("topic/exmaple", 1, false, "message"); tok.Wait() && tok.Error() != nil {
logrus.WithError(tok.Error()).Warnf("failed to publish, continuing")
continue
}
time.Sleep(50 * time.Millisecond)
}
}
}
func connect(client mqtt.Client) error {
if tok := client.Connect(); tok.Wait() && tok.Error() != nil {
logrus.WithError(tok.Error()).Error("failed to connect")
return tok.Error()
}
return nil
}
func setup(opts *mqtt.ClientOptions, id string) mqtt.Client {
opts.SetClientID(id)
opts.SetOrderMatters(false)
opts.SetCleanSession(false)
// opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
// logrus.Infof("received message on topic %s which does not match any subscriptions (yet)", msg.Topic())
// })
opts.SetKeepAlive(10 * time.Second)
opts.SetPingTimeout(1 * time.Second)
opts.SetAutoReconnect(true)
opts.SetConnectTimeout(15 * time.Second)
cl := mqtt.NewClient(opts)
return cl
}
And just as @Brits explained in his answer, if I uncomment the DefaultPublishHandler
, the messages get acknowledged and the subscription works for further messages.
Upvotes: 3
Views: 4171
Reputation: 18290
Thanks for the logs; the fact that Mosquitto is not receiving a PUBACK
for the messages received immediately after the connection is established led me to the likely cause.
With Mosquitto v1.6.x max_inflight_messages
defaults to 10; so after 10 unacknowledged messages Mosquitto will not send any more messages. This is why it stops sending to clientA
.
paho.mqtt.golang
will not acknowledge a message if there is no handler (if you enable logging it will output a warning when this happens). The rationale for this is lost in the mists of time (I added the warning) but I suspect it's because, without a handler, the message cannot be said to have been processed (so should not be acknowledged). Earlier versions of Mosquitto used to resend messages that had not been acknowledged, but this is no longer the case (and prohibited in the v5 spec), meaning that it's effectively a permanent block.
In your case these two factors are combining; you connect, receive 10 PUBLISH
packets, and then subscribe (setting up the handlers) but by that point Mosquitto has 10 messages in flight and will not send any more.
The fix is to add:
opts.SetDefaultPublishHandler(func(mqtt.Client, mqtt.Message) {})
This will add a default publish handler (that ignores the messages); the fact that the handler exists means that PUBLISH
packets will be acknowledged.
Does ClientA
need messages received whilst offline (it's unsubscribing from telemetry/+
so will not get those anyway). If not then using opts.SetCleanSession(true)
is another way of avoiding this issue.
If you do need to process the messages then use AddRoute
to configure your message handlers before you connect (I generally have a catch-all DefaultPublishHandler
that just logs the message so I can see that something has been missed).
Upvotes: 1