user2892909
user2892909

Reputation: 81

Unable to receive all MQTT messages in Python

I am trying to send messages from one python script to another using MQTT. One script is a publisher. The second script is a subscriber. I send messages every 0.1 second.

Publisher:

client = mqtt.Client('DataReaderPub')
client.connect('127.0.0.1', 1883, 60)
print("MQTT parameters set.")

# Read from all files
count = 0
for i in range(1,51):
    payload = "Hello world" + str(count)
    client.publish(testtopic, payload, int(publisherqos))
    client.loop()
    count = count+1
    print(count, ' msg sent: ', payload)
    sleep(0.1)

Subscriber:

subclient = mqtt.Client("DynamicDetectorSub")
subclient.on_message = on_message
subclient.connect('127.0.0.1')

subclient.subscribe(testtopic, int(subscriberqos))

subclient.loop_forever()

mosquitto broker version - 3.1

mosquitto.conf has max inflight messages set to 0, persistence true.

publisher QOS = 2

subscriber QOS = 2

topic = 'test' in both scripts

When I run subscriber and publisher in the same script, the messages are sent and received as expected. But when they are in separate scripts, I do not receive all the messages and sometimes no messages. I run subscriber first and then publisher. I have tried subscriber with loop.start() and loop.stop() with waiting for few minutes.

I am unable to debug this problem. Any pointers would be great!

EDIT:

  1. I included client.loop() after publish. -> Same output as before
  2. When I printed out statements in 'on_connect' and 'on_disconnect', I noticed that client mqtt connection gets established and disconnects almost immediately. This happens every second. I even got this message once -

    [WinError 10053] An established connection was aborted by the software in your host machine

Keep Alive = 60

Is there any other parameter I should look at?

Upvotes: 1

Views: 4215

Answers (3)

user2892909
user2892909

Reputation: 81

It turned out to be a silly bug. As hardillb suggested I looked at the broker logs. It showed that the subscriber client was already connected. I am using Pycharm after a really really long time. So I had accidentally ran publisher and subscriber so many times that they were running in parallel in the output console. No wonder they got disconnected since the client IDs were the same. Sorry for the trouble. BTW client.loop() after publish is not needed. Thanks hardillb.

Upvotes: 1

larsks
larsks

Reputation: 312580

When I ran your code, the subscriber was often missing the last packet. I was not otherwise able to reproduce the problems you described.

If I rewrite the publisher like this instead...

from time import sleep
import paho.mqtt.client as mqtt

client = mqtt.Client('DataReaderPub')
client.connect('127.0.0.1', 1883, 60)
print("MQTT parameters set.")

client.loop_start()

# Read from all files
count = 0
for i in range(1,51):
    payload = "Hello world" + str(count)
    client.publish('test', payload, 2)
    count = count+1
    print(count, ' msg sent: ', payload)
    sleep(0.1)

client.loop_stop()
client.disconnect()

...then I no longer see the dropped packet. I'm using the start_loop/stop_loop methods here, which run the mqtt loop asynchronously. I'm not sure exactly what was causing your dropped packet, but I suspect that the final message was still in the publisher's send queue when the code exits.

Upvotes: 0

hardillb
hardillb

Reputation: 59816

You need to call the network loop function in the publisher as well so the client actually gets some time to do the IO (And the dual handshake for the QOS2).

Add client.loop() after the call to client.publish() in the client:

import paho.mqtt.client as mqtt
import time

client = mqtt.Client('DataReaderPub')
client.connect('127.0.0.1', 1883, 60)
print("MQTT parameters set.")

# Read from all files
count = 0
for i in range(1,51):
    payload = "Hello world" + str(count)
    client.publish("test", payload, 2)
    client.loop()
    count = count+1
    print(count, ' msg sent: ', payload)
    time.sleep(0.1)

Subscriber code:

import paho.mqtt.client as mqtt

def on_message(client, userdata, msg):
  print(msg.topic + " " + str(msg.payload))

subclient = mqtt.Client("DynamicDetectorSub")
subclient.on_message = on_message
subclient.connect('127.0.0.1')

subclient.subscribe("test", 2)

subclient.loop_forever()

Upvotes: 2

Related Questions