Torbo
Torbo

Reputation: 1

How to handle a lot of MQTT messages in very short time in python with paho, problems receiving correctly

I am using a raspberyy pi 4b (Raspbian OS, Bullseye) with a mosquitto broker running on it. In my Network there is a shelly device (shelly pm mini) that measures power and sends a MQTT Message whenever the power changes. On My Raspberry I want to receive the messages and store the data in a data bank to display them on a website later. Im using paho in Python (3.10) and subscribed to my topic with registering my on_message callback function. The Issue I am having is, that when 3 or 4 Messages sent over MQTT in a short period of time (1-10 ms), My code isn´t working. It seems like the messages are processed in my callback function in parallel instead of one after the other. This is a problem because this timing Issue leads to me not being able to process the received data correctly. I added a counter that should count up whenever the on_message function is called, however when the Messages arrive nearly at the same time (see the timestamp under this), The counter is not increased, which leads me into thinking the Process isn't going correctly one after another. This is My Callback function:

 def on_message(self, client, userdata, msg):
        timestamp_now = time.time()
        print(timestamp_now)
        print(self.counter)
        self.counter = self.counter +1

And this is My output when 3 Messages arrive with only ms apart.

1732657116.8512473 1

1732657116.8514547 1

1732657116.8516352 1

1732657116.8521552 1

1732657133.3355079 2

1732657133.3358626 2

1732657133.3359528 2

1732657137.4190025

1732657137.419096 3

1732657137.4203782 3

As cou can see the timestamps have different milicesond values, so they are different but the counter is not increased for each messsage, thats why i think the functions migth be called parallel instead of one call to the same function after the other.

I initially tried saving the received data into my databank, but i only wanted to save every 60 seconds, so i compared the newest timestamp to the last time saved and when this difference was over 60, i tried to save the data. However, whenn 3 Messages arrived Fast with nearly no time between them and the data was supposed to be saved beacause the 60 seconds were over, the data got saved three times (All 3 Sata from the 3 Messages got saved). Then i made the minimal example with the counter to test this and found out that something is not working as expected. I have no idea at this point.

Upvotes: -1

Views: 55

Answers (1)

Torbo
Torbo

Reputation: 1

the hint with the amount of active clients by Brits was very useful. In Fact, the Problem was no Timing Issue or something like that, there have been 4 clients running parallel in my Raspberry Pi. This was because: I am using MQTT combined with a flask web server. I also have the debug mode enabled, so when it did a restart, it created another instance of my MQTT Class and I had already two clients running. So to solve this I added Singleton for my classes that use MQTT and the clients went down from 4 to 2. The last thing was disabling the reloader in my flask app settings, because flask created two different threads, one for the main task and one for reloading and because of that it also created two clients. So now I only have one client running and it works as wanted. Thank you for your help! These are my Flask settings now:

app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False, threaded=True)

And now I have this output:

1732867888.8439083
0
1732867919.9634273
1
1732867980.019778
2
1732868039.958098
3
1732868099.954424
4

And this is my Singleton Implemetation i used:

import threading


class Singleton(type):
    _instances = {}
    _singleton_locks: dict[any, threading.Lock] = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            if cls not in cls._singleton_locks:
                cls._singleton_locks[cls] = threading.Lock()
            with cls._singleton_locks[cls]:
                if cls not in cls._instances:
                    cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]

Upvotes: 0

Related Questions