jnd
jnd

Reputation: 754

Thread issue while subscribing to MQTT in Python using Paho MQTT

I have a python program which listens to an MQTT topic and needs to process the message. I specify a number of arguments from the command line to evaluate the message differently.

import argparse
import datetime
import json

import paho.mqtt.client as mqtt

### MQTT Functions
def on_connect(mqttc, obj, flags, rc):
    print("Connected! - " + str(rc))

def on_message(mqttc, obj, msg):
    print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))

def on_publish(mqttc, obj, mid):
    print("Published! "+str(mid))

def on_subscribe(mqttc, obj, mid, granted_qos):
    print("Subscribed! - "+str(mid)+" "+str(granted_qos))

def on_log(mqttc, obj, level, string):
    print(string)

if __name__ == "__main__":
    # Handle args
    parser = argparse.ArgumentParser(
        description='This is to be usedin conjunction with the WifiScanner on a Raspberry Pi')
    parser.add_argument('--topic', metavar='base/sub', type=str, nargs='?', help='Full topic to listen to. (Example "proximity/sensor")', default="proximity/#")
    parser.add_argument('--host', metavar='url', type=str, nargs='?',
                        help='UQL of MQTT server.')
    parser.add_argument('--graph', metavar='True/False', type=bool, nargs='?', help='Whether to print the data.', default=True)
    parser.add_argument('--timeout', metavar='sec', type=int, nargs='?', help='How long the device will be remembered', default=10)
    args = parser.parse_args()
    # MQTT
    mqttc = mqtt.Client()
    # mqttc.on_message = on_message
    mqttc.on_connect = on_connect
    mqttc.on_publish = on_publish
    mqttc.on_subscribe = on_subscribe
    # Uncomment to enable debug messages
    #mqttc.on_log = on_log
    mqttc.connect(args.host, 1883, 60)
    mqttc.subscribe(args.topic, 0)
    # Start to listen    
    while True:
        print mqttc.loop()

The problem with this, is that I can't see an easy way to pass the command line arguments to the on_message callback. So I tried using the return value of .loop. However, when I try to exit using Ctrl+Z (only keyboard interrupt that works), it does not exit the MQTT threads and leaves them running.

The documentation and examples don't have an example on how to handle messages outside the on_message callback and how to cleanly exit.

So any help fixing this issue would be very appreciated.

Thanks in advance

Upvotes: 4

Views: 5929

Answers (3)

rishav
rishav

Reputation: 9

here is another example of using classess in paho.

import paho.mqtt.client as mqtt
class client1:

    def on_connect(self, master, obj, flags, rc):
        self.master.subscribe('/temperature123')


    def on_message(self, master, obj, msg):
        print(str(msg.payload))

    def __init__(self,master):
        self.master=master
        self.master.on_connect=self.on_connect
        self.master.on_message=self.on_message
        self.master.connect("test.mosquitto.org",1883,60)

client=mqtt.Client()
ob1=client1(client)
client.loop_forever()

Upvotes: 0

ralight
ralight

Reputation: 11608

You could use the userdata argument from the Client() constructor. It ends up being passed to every callback.

Upvotes: 2

jnd
jnd

Reputation: 754

Fixed this, thanks James Mills

Placed the callbacks inside a class:

class Receiver:
    def __init__(self, graph, timeout):
        self.graph = graph
        self.timeout = timeout

    def on_connect(self, mqttc, obj, flags, rc):
        print("Connected! - " + str(rc))

    def on_message(self, mqttc, obj, msg):
        print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))

    def on_publish(self, mqttc, obj, mid):
        print("Published! "+str(mid))

    def on_subscribe(self, mqttc, obj, mid, granted_qos):
        print("Subscribed! - "+str(mid)+" "+str(granted_qos))

    def on_log(self, mqttc, obj, level, string):
        print(string)

and then set it up like

mqttc.on_message = receiver.on_message

Upvotes: 0

Related Questions