moluzhui
moluzhui

Reputation: 1093

How to speed up the recovery of mqtt subscription messages after disconnected network reconnection

I have a project code in python which need to server send mqtt message to the device, It needs to solve the problem of device and cloud server application operation and information synchronization. I found that paho.mqtt!'s Client function has a parameter Client(client_id="", clean_session=True) to restore the unconnected subscription to receive the message sent during the offline. But what I found in my experiments is that the longer you disconnect, the longer it takes you to recover, and the slower your message will be.

I would very appreciate it if you guys can suggest me some advice to speed up recovery or any related references about Cloud-Device application operation and information synchronization.

I have written test disconnection recovery

Cloud

client = mqtt.Client(client_id='Server',clean_session=False)
client.enable_logger(logger)
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_IP,MQTT_PORT,60)
client.subscribe(topic="Cloud/#",qos=2)
client.loop_forever()

def on_message(client, userdata, msg):
    topic = msg.topic
    payload = msg.payload.decode('utf-8')
    print(topic)
    print(payload)
    pattern_create = 'Cloud/create'
    pattern_delete = 'Cloud/delete'
    pattern_modify = 'Cloud/modify'
    pattern_connect = 'Cloud/connect'
    if re.match(pattern_create, topic):
        pub("Device/create","create",MQTT_IP,MQTT_PORT)
    elif re.match(pattern_delete, topic):
        pub("Device/delete","delete",MQTT_IP,MQTT_PORT)
    elif re.match(pattern_modify, topic):
        pub("Device/modify","modify",MQTT_IP,MQTT_PORT)
    elif re.match(pattern_connect, topic):
        print('connected')
        pub("Device/connect","connected",MQTT_IP,MQTT_PORT)

Device

def on_message(client, userdata, msg):
    info = msg.payload.decode('utf-8')
    print("##################info=",info)

def wait_to_handle("Device/#", mqtt_host, mqtt_port):
    client = mqtt.Client(client_id="wait_to_handle", clean_session=False)
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(mqtt_host,mqtt_port,60)
    client.subscribe(topic=topics,qos=2)
    client.loop_forever()

def send_data_func():
    client = mqtt.Client('SendToCloud',clean_session=False)
    client.on_publish = on_publish
    client.connect(MQTT_IP, MQTT_PORT , 60)
    client.loop_start()
    count = 0
    while 1:
        print(count)
        client.publish('Cloud/count',str(count),2)
        count += 1
        time.sleep(2)

device result

...
much Device sent message
...
Device sent message
Device sent message
Device sent message
Device sent message
Device sent message
Device sent message
18666
18667
18668
wait handle : Connected with result code 0
##################info= create
##################info= delete
##################info= modify
##################info= create
##################info= delete
##################info= modify
##################info= create
##################info= delete
##################info= modify
##################info= create
##################info= delete
##################info= modify
18669
18670
18671
18672
18673
18674
Device sent message
Device sent message
Device sent message
Device sent message
...
...

Upvotes: 2

Views: 2172

Answers (1)

Skrino
Skrino

Reputation: 510

As you noted, using the persistent session by disabling the clean_session flag maintains all of the active subscriptions, such that the client need not re-subscribe, and queues up messages (only QoS 1 or QoS 2) if the client temporarily lost connection and reconnected.

The broker will queue up as many messages as it is configured to, which may be in the thousands -- hencewhy the time to recover is proportional to the time of disconnect.

Your options are:

  • Set clean_session flag to true, which would disable all queueing of missed messages but also require you to re-subscribe to topics of interest.
  • Keep clean_session flag as false to maintain your subscriptions, but the publisher to your topics must publish with QoS of 0 to bypass the queueing
    • Additionally, the publisher can set retained flag set to true on published messages, which will make the broker hold onto the latest message on that topic and send it to the client upon re-connect
  • Set clean_session flag to true, but make sure the producer publishes with retained flag set to true, which will make the broker hold onto the latest message on that topic and send it to the client upon re-connect but any QoS will be supported
  • Configure the broker to queue up fewer missed messages

Upvotes: 1

Related Questions