FrVaBe
FrVaBe

Reputation: 49341

MQTT Retained messages not received when subscribing same topic from different applications at the same time

TL;DR

When subscribing to the same topic tree with multiple clients at the same time, not all clients receive the retained messages as expected!

in detail / use case

In a real project several applications subscribe (nearly at the same time because they are started in parallel) to the same MQTT topic (with wildcard). The topic contains about 500 retained messages (each in an own sub topic level) which all applications are expected to receive (they are subscribing with QoS 1).

Beside the "configuration" messages also data topics are subscribed with the same MQTT connection. No persisted state is required (and wanted here). Therefore the application instances connect with cleanSession=true.

For my understanding it would be sufficient if the application instances would each connect with a fixed clientId as cleanSession=true should avoid any state handling. But to be really sure that no state is considered a unique MQTT clientId is generated for each connect.

observerd behavior

Unfortunately not all application instances get the retained messages.. Some get no messages at all from the topic - regardless of how long the subscription lasts. I first thought that the maxInflight (client side) or max_queued_messages (server side) configuration might be the reasons, but after increasing both to 500,000 I guess this is not the reasons behind the failure.

reproduction as test

Therefore I created this github project with a repro. There is a unit test class in the repro MqttSubscriptionTest with the test method multiThreadSubscriptionTest. When executing this test some (1000) retained messages will be published first in the @BeforeClass method. After that, 10 instances of a MqttSubscriber class which implements the IMqttMessageListener and Runnable interface will be instantiated and executed. Each MqttSubscriber instance will be executed in an own thread with an own MqttClient instance and will subscribe to the topic tree with the retained messages. This is logged to the console as follows:

----------- perform subscriptions
Subscriber-3 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-0 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-2 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-4 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-5 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-6 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-1 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-7 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-8 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-9 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'

The test will wait some time and after that validate the subscriptions. It is expected that each Subscriber received the 1000 retained messages:

----------- validate subscriptions
Subscriber-4: receivedMessages=1000; duration=372ms; succeeded=true
Subscriber-0: receivedMessages=1000; duration=265ms; succeeded=true
Subscriber-5: receivedMessages=1000; duration=475ms; succeeded=true
Subscriber-7: receivedMessages=0; duration=0ms; succeeded=false
Subscriber-6: receivedMessages=1000; duration=473ms; succeeded=true
Subscriber-8: receivedMessages=0; duration=0ms; succeeded=false
Subscriber-9: receivedMessages=1000; duration=346ms; succeeded=true
Subscriber-3: receivedMessages=1000; duration=243ms; succeeded=true
Subscriber-1: receivedMessages=1000; duration=470ms; succeeded=true
Subscriber-2: receivedMessages=1000; duration=357ms; succeeded=true

Most subscriber received the expected 1000 messages in a very short time (some hundreds ms). But some (here Subscriber-7/8) did not receive a single message (duration is 0 because they never finished). The situation is not better when giving the subscribers more time to receive the messages. They just won't get them.

I have no idea why this happens. No error messages are shown on the MQTT broker or client side. If you can give any help this would be super useful for me, because I dependent on the reliable delivery of retained messages.

Repro on GitHub: FrVaBe/MQTT/mqtt-client-showcase/

Upvotes: 0

Views: 3628

Answers (2)

FrVaBe
FrVaBe

Reputation: 49341

The HiveMQ people were kind enough to take a look at this problem. They suspect the cause in the Paho client and the use of the IMqttMessageListener in the subscribe. There is a described issue #432 of the presumed Race Condition.

Lesson learned: Better use MqttCallback instead of IMqttMessageListener

Upvotes: 4

Michael Simons
Michael Simons

Reputation: 4880

You may not use cleanSession(true) during connect, see explanation here: http://www.steves-internet-guide.com/mqtt-retained-messages-example/

I used mosquito for tests and it's internal message queue is only 100 by default.

HiveMQ has max-queued-messages.

Emq config has something similar http://emqtt.io/docs/v2/config.html#mqtt-message-queue.

I fixed the example code in the repo. Works for me.

Edit: Just tested it with Emq (docker run --rm -ti --name emq -p 18083:18083 -p 1883:1883 quodt/emq-docker:latest) and it works fine.

Basically, it's cleanSession: Must be false. Apart from that, the waiting states in your test codes are bad. They are too short on my machine. Use a latch or other real sync mechanism.

Upvotes: 0

Related Questions