Reputation: 5017
Trying out to combine two features of HiveMQ: shared subscriptions and persistent sessions.
If have created a very simple message producer. And a very simple consumer. When running multiple consumers, all consumers receive all messages.
After setting clearSession to 'false' for the consumers, when running a consumer, and restart the consumer, the consumer also receives the messages when it was not connect. Excellent.
Now combining it with the shared subscription feature. When only using shared subscription, and clearSession is 'true'. When running multiple consumer, a message is only received by a single consumer. It should be round-robin and that is also the case, but as soon as you stop a consumer the messages a no longer round-robin but one of the consumers gets significantly more messages then the other(s).
If I now enable persistent session again, clearSession is 'false', and start the shared subscription consumers, the consumers start to receive all messages again instead of the message is just delivered to one client.
What is the issue here? Is this a bug in HiveMQ? Can persistent session and shared subscription not be used together? That would really be a bummer.
UPDATE 15/2/2017 As @fraschbi suggested I cleared all data and retested the shared subscription with persistent session consumers again. It seems to work!
What is strange though, is that the missed messages are only received once the 1st consumer reconnects. All consumers have equal code, they're just started with different clientId arguments. See code below. My test sequence:
So my new question is: why does only the 1st consumer receive the lost messages?
Note: the trick here is still not to unsubscribe when stopping the client, because then the subscription/persistence setting is lost!
Producer.scala
object Producer extends App {
val topic = args(0)
val brokerUrl = "tcp://localhost:1883"
val clientId = UUID.randomUUID().toString
val client = new MqttClient(brokerUrl, clientId)
client.connect()
val theTopic = client.getTopic(topic)
var count = 0
sys.addShutdownHook {
println("Disconnecting client...")
client.disconnect()
println("Disconnected.")
}
while(true) {
val msg = new MqttMessage(s"Message: $count".getBytes())
theTopic.publish(msg)
println(s"Published: $msg")
Thread.sleep(1000)
count = count + 1
}
}
Consumer.scala
object Consumer extends App {
val topic = args(0)
val brokerUrl = "tcp://localhost:1883"
val clientId = args(1)
// val clientId = UUID.randomUUID().toString
val client = new MqttClient(brokerUrl, clientId)
client.setCallback(new MqttCallback {
override def deliveryComplete(token: IMqttDeliveryToken) = ()
override def messageArrived(topic: String, message: MqttMessage) = println(s"received on topic '$topic': ${new String(message.getPayload)}")
override def connectionLost(cause: Throwable) = println("Connection lost")
})
println(s"Start $clientId consuming from topic: $topic")
val options = new MqttConnectOptions()
options.setCleanSession(false);
client.connect(options)
client.subscribe(topic)
sys.addShutdownHook {
println("Disconnecting client...")
// client.unsubscribe(topic)
client.disconnect()
println("Disconnected.")
}
while(true) {
}
}
Upvotes: 3
Views: 819
Reputation: 533
I will try to answer the two issues you are experiencing separately.
It should be round-robin and that is also the case, but as soon as you stop a consumer the messages a no longer round-robin but one of the consumers gets significantly more messages then the other(s).
HiveMQ does prefer online clients, when distributing messages for shared subscriptions.
If I now enable persistent session again, clearSession is 'false', and start the shared subscription consumers, the consumers start to receive all messages again instead of the message is just delivered to one client.
In the beginning of your question, you said you are connecting clients with cleanSession=false
to the broker and subscribing to the topic. (It sounds as if you're only using a single topic.)
Is it possible, that you are not unsubscribing these clients before reconnecting with cleanSession=false
and shared subscriptions? In that case, the subscriptions from the first step of your scenario would still be persisted for those clients and naturally they would each receive the messages.
EDIT:
So my new question is: why does only the 1st consumer receive the lost messages?
From the HiveMQ User Guide:
When a clients offline queue is full, the message for that client won’t be dropped but queued for the next offline client in a shared subscription group.
When all clients are offline, the distribution is no longer round robin. So the scenario you're describing is within expected behaviour.
The default value for the message queue is 1000. So you can either send more than 1000 message, while the clients are offline, or decrease the message queue size.
...
<persistence>
<queued-messages>
<max-queued-messages>50</max-queued-messages>
</queued-messages>
...
</persistence>
...
Add this to your config.xml
for decreasing the message queue size.
Upvotes: 3